|
Example: Autonomic Parallel Matrix Multiplication
regulating_agent4.xml
<?xml version="1.0"?>
<!DOCTYPE myMessage SYSTEM "mobilec.dtd">
<MOBILEC_MESSAGE>
<MESSAGE message="MOBILE_AGENT">
<MOBILE_AGENT>
<AGENT_DATA>
<NAME>regulating_agent4</NAME>
<OWNER>IEL</OWNER>
<HOME>shrimp.engr.ucdavis.edu:5050</HOME>
<TASKS task="1" num="0">
<TASK num="0" complete="0" server="shrimp.engr.ucdavis.edu:5050">
</TASK>
<AGENT_CODE>
<![CDATA[
#include <stdio.h>
#include <sys/stat.h>
#include <fipa_acl.h>
#define NUM 4
#define LEN 50
char community[NUM][LEN] =
{
"shrimp.engr.ucdavis.edu:5050",
"bird2.engr.ucdavis.edu:5050",
"ch.engr.ucdavis.edu:5050",
"phoenix.engr.ucdavis.edu:5050"
};
int main() {
int i;
int j;
int m;
int n;
struct stat buffer;
char ally[NUM][LEN];
char taskcode_path[] = "task/mm.c";
int taskcode_size;
int envconfigcode_size = 300;
int agentcode_size;
char agentcode_size_str[50];
char *taskcode;
char *agentcode;
char config_str[50];
char mpi_envconfig[300];
char mpi_root_host[] = "shrimp.engr.ucdavis.edu";
int mpi_root_port = 5130;
char taskagent_name[50];
char taskagent_name_tmp[] = "task_agent";
fipa_acl_message_p acl;
char local_aclsender_name[] = "regulating_agent4";
char local_aclsender_address[] = "http://shrimp.engr.ucdavis.edu:5050/acc";
char aclreceiver_name[50];
char aclreceiver_address[50];
char aclreceiver_name_tmp[] = "regulating_agent";
char aclsender_address[50];
FILE *fptr;
char performancecode_path[] = "task/flops3.c";
int performancecode_size;
int aclconfigcode_size = 200;
int agentcode2_size;
char *performancecode;
char *agentcode2;
char config_str2[60];
char performanceagent_name[50];
char performanceagent_name_tmp[] = "performance_agent";
double sum;
double performance_index[NUM-1];
double load_proration[NUM-1];
char load_config[50];
char load_config_tmp[5];
MCAgent_t agent;
for(i=0; i<NUM; i++) {
strcpy(ally[i], community[i]);
}
stat(taskcode_path, &buffer);
taskcode_size = buffer.st_size;
agentcode_size = taskcode_size + envconfigcode_size;
sprintf(agentcode_size_str, "%d", agentcode_size);
for(i=1; i<NUM; i++) {
sprintf(aclreceiver_name, "%s%d", aclreceiver_name_tmp, i);
sprintf(aclreceiver_address, "http://%s/acc", ally[i]);
printf("\nRequest to Agent '%s' : '%d' bytes\n",
aclreceiver_name, agentcode_size);
acl = mc_AclNew();
mc_AclSetPerformative(
acl,
FIPA_REQUEST);
mc_AclSetSender(
acl,
local_aclsender_name,
local_aclsender_address);
mc_AclAddReceiver(
acl,
aclreceiver_name,
aclreceiver_address);
mc_AclSetContent(
acl,
agentcode_size_str);
mc_AclSend(acl);
mc_AclDestroy(acl);
}
for(i=1; i<NUM; i++) {
memset(ally[i], 0, LEN);
}
i = 1;
m = 1;
n = 0;
while((acl = mc_AclWaitRetrieve(mc_current_agent))) {
switch(acl->performative) {
case FIPA_AGREE: {
printf("\nRespond from Agent '%s' : 'AGREE'\n",
acl->sender->name);
strcpy(aclsender_address, acl->sender->addresses->urls[0]->str);
strncpy(ally[m],
aclsender_address+7,
strstr(aclsender_address+7, "/")-(aclsender_address+7));
m++;
n++;
mc_AclDestroy(acl);
break;
}
case FIPA_REFUSE: {
printf("\nRespond from Agent '%s' : 'REFUSE'\n",
acl->sender->name);
mc_AclDestroy(acl);
break;
}
default:
mc_AclDestroy(acl);
break;
}
i++;
if(i == NUM) {
break;
}
}
stat(performancecode_path, &buffer);
performancecode_size = buffer.st_size;
agentcode2_size = performancecode_size + aclconfigcode_size;
performancecode = (char *)malloc((performancecode_size+1)*sizeof(char));
memset(performancecode, 0, performancecode_size+1);
fptr = fopen(performancecode_path, "r");
fread(performancecode, sizeof(char), performancecode_size, fptr);
fclose(fptr);
agentcode2 = (char *)malloc((agentcode2_size+1)*sizeof(char));
for(i=1; i<m; i++) {
sprintf(config_str2, "char s_n[]=\"%s%d\";\n",
performanceagent_name_tmp, i+1);
strcpy(agentcode2, config_str2);
sprintf(config_str2, "char s_a[]=\"http://%s/acc\";\n",
ally[i]);
strcat(agentcode2, config_str2);
strcat(agentcode2, performancecode);
sprintf(performanceagent_name, "%s%d", performanceagent_name_tmp, i);
//printf("%s", agentcode2);
agent = mc_ComposeAgent(
performanceagent_name,
ally[0],
"IEL",
agentcode2,
"no-return",
ally[i],
0);
mc_AddAgent(agent);
printf("\nSend '%s' to : '%s'\n", performanceagent_name, ally[i]);
}
free(performancecode);
free(agentcode2);
for(i=1; i<NUM; i++) {
memset(ally[i], 0, LEN);
}
i = 1;
j = 1;
n = 0;
while((acl = mc_AclWaitRetrieve(mc_current_agent))) {
switch(acl->performative) {
case FIPA_INFORM: {
printf("\nRespond from Agent '%s' : '%s' (Performance Index)\n",
acl->sender->name, acl->content->content);
strcpy(aclsender_address, acl->sender->addresses->urls[0]->str);
strncpy(ally[j],
aclsender_address+7,
strstr(aclsender_address+7, "/")-(aclsender_address+7));
performance_index[n] = atof(acl->content->content);
j++;
n++;
mc_AclDestroy(acl);
break;
}
default:
mc_AclDestroy(acl);
break;
}
i++;
if(i == m) {
break;
}
}
for(i=0; i<n; i++) {
sum += performance_index[i];
}
for(i=0; i<n; i++) {
load_proration[i] = performance_index[i]/sum;
//printf("load_proration[%d] = %f\n", i, load_proration[i]);
}
sprintf(load_config, "double load_proration[%d]={", n);
for(i=0; i<(n-1); i++) {
sprintf(load_config_tmp, "%.1f,", load_proration[i]);
strcat(load_config, load_config_tmp);
}
sprintf(load_config_tmp, "%.1f};\n", load_proration[i]);
strcat(load_config, load_config_tmp);
//printf("%s\n", load_config);
taskcode = (char *)malloc((taskcode_size+1)*sizeof(char));
memset(taskcode, 0, taskcode_size+1);
fptr = fopen(taskcode_path, "r");
fread(taskcode, sizeof(char), taskcode_size, fptr);
fclose(fptr);
sprintf(config_str,
"putenv(\"PMI_ROOT_HOST=%s\");\n",
mpi_root_host);
strcpy(mpi_envconfig, config_str);
sprintf(config_str,
"putenv(\"PMI_ROOT_PORT=%d\");\n",
mpi_root_port);
strcat(mpi_envconfig, config_str);
sprintf(config_str,
"putenv(\"PMI_ROOT_LOCAL=1\");\n");
strcat(mpi_envconfig, config_str);
sprintf(config_str,
"putenv(\"PMI_SIZE=%d\");\n",
m);
strcat(mpi_envconfig, config_str);
agentcode = (char *)malloc((agentcode_size+1)*sizeof(char));
for(i=0; i<m; i++) {
strcpy(agentcode, mpi_envconfig);
sprintf(config_str,
"putenv(\"PMI_RANK=%d\");\n",
i);
strcat(agentcode, config_str);
strcat(agentcode, load_config);
strcat(agentcode, taskcode);
sprintf(taskagent_name, "%s%d", taskagent_name_tmp, i+1);
//printf("%s", agentcode);
agent = mc_ComposeAgent(
taskagent_name,
ally[0],
"IEL",
agentcode,
"no-return",
ally[i],
0);
mc_AddAgent(agent);
if(i == 0) {
printf("\nSend '%s' to : '%s' (head machine, local)\n",
taskagent_name, ally[i]);
usleep(1000000);
}
else {
printf("\nSend '%s' to : '%s'\n", taskagent_name, ally[i]);
}
}
free(agentcode);
free(taskcode);
return 0;
}
]]>
</AGENT_CODE>
</TASKS>
</AGENT_DATA>
</MOBILE_AGENT>
</MESSAGE>
</MOBILEC_MESSAGE>
regulating_agent1.xml
<?xml version="1.0"?>
<!DOCTYPE myMessage SYSTEM "mobilec.dtd">
<MOBILEC_MESSAGE>
<MESSAGE message="MOBILE_AGENT">
<MOBILE_AGENT>
<AGENT_DATA>
<NAME>regulating_agent1</NAME>
<OWNER>IEL</OWNER>
<HOME>bird2.engr.ucdavis.edu:5050</HOME>
<TASKS task="1" num="0">
<TASK num="0" complete="0" server="bird2.engr.ucdavis.edu:5050">
</TASK>
<AGENT_CODE>
<![CDATA[
#include <stdio.h>
#include <fipa_acl.h>
int allowable_code_size = 6000;
int main() {
fipa_acl_message_p acl;
fipa_acl_message_p reply;
int requested_code_size;
printf("\nAllowable code size on '%s' : '%d' bytes.\n",
"bird2.engr.ucdavis.edu:5050",
allowable_code_size);
while((acl = mc_AclWaitRetrieve(mc_current_agent))) {
reply = mc_AclReply(acl);
mc_AclSetPerformative(reply, FIPA_ERROR);
switch(acl->performative) {
case FIPA_REQUEST: {
printf("\nRequest from Agent '%s' : '%s' bytes\n",
acl->sender->name, acl->content->content);
requested_code_size = atoi(acl->content->content);
if(requested_code_size <= allowable_code_size) {
mc_AclSetPerformative(reply, FIPA_AGREE);
printf("\nRespond to Agent '%s' : 'AGREE'\n",
acl->sender->name);
}
else {
mc_AclSetPerformative(reply, FIPA_REFUSE);
printf("\nRespond to Agent '%s' : 'REFUSE'\n",
acl->sender->name);
}
mc_AclDestroy(acl);
break;
}
default:
mc_AclDestroy(acl);
break;
}
mc_AclSetSender(reply, "regulating_agent1", "http://bird2.engr.ucdavis.edu:5050/acc");
mc_AclSend(reply);
mc_AclDestroy(reply);
}
return 0;
}
]]>
</AGENT_CODE>
</TASKS>
</AGENT_DATA>
</MOBILE_AGENT>
</MESSAGE>
</MOBILEC_MESSAGE>
regulating_agent2.xml
<?xml version="1.0"?>
<!DOCTYPE myMessage SYSTEM "mobilec.dtd">
<MOBILEC_MESSAGE>
<MESSAGE message="MOBILE_AGENT">
<MOBILE_AGENT>
<AGENT_DATA>
<NAME>regulating_agent2</NAME>
<OWNER>IEL</OWNER>
<HOME>ch.engr.ucdavis.edu:5050</HOME>
<TASKS task="1" num="0">
<TASK num="0" complete="0" server="ch.engr.ucdavis.edu:5050">
</TASK>
<AGENT_CODE>
<![CDATA[
#include <stdio.h>
#include <fipa_acl.h>
int allowable_code_size = 5000;
int main() {
fipa_acl_message_p acl;
fipa_acl_message_p reply;
int requested_code_size;
printf("\nAllowable code size on '%s' : '%d' bytes.\n",
"ch.engr.ucdavis.edu:5050",
allowable_code_size);
while((acl = mc_AclWaitRetrieve(mc_current_agent))) {
reply = mc_AclReply(acl);
mc_AclSetPerformative(reply, FIPA_ERROR);
switch(acl->performative) {
case FIPA_REQUEST: {
printf("\nRequest from Agent '%s' : '%s' bytes\n",
acl->sender->name, acl->content->content);
requested_code_size = atoi(acl->content->content);
if(requested_code_size <= allowable_code_size) {
mc_AclSetPerformative(reply, FIPA_AGREE);
printf("\nRespond to Agent '%s' : 'AGREE'\n",
acl->sender->name);
}
else {
mc_AclSetPerformative(reply, FIPA_REFUSE);
printf("\nRespond to Agent '%s' : 'REFUSE'\n",
acl->sender->name);
}
mc_AclDestroy(acl);
break;
}
default:
mc_AclDestroy(acl);
break;
}
mc_AclSetSender(reply, "regulating_agent2", "http://ch.engr.ucdavis.edu:5050/acc");
mc_AclSend(reply);
mc_AclDestroy(reply);
}
return 0;
}
]]>
</AGENT_CODE>
</TASKS>
</AGENT_DATA>
</MOBILE_AGENT>
</MESSAGE>
</MOBILEC_MESSAGE>
regulating_agent3.xml
<?xml version="1.0"?>
<!DOCTYPE myMessage SYSTEM "mobilec.dtd">
<MOBILEC_MESSAGE>
<MESSAGE message="MOBILE_AGENT">
<MOBILE_AGENT>
<AGENT_DATA>
<NAME>regulating_agent3</NAME>
<OWNER>IEL</OWNER>
<HOME>phoenix.engr.ucdavis.edu:5050</HOME>
<TASKS task="1" num="0">
<TASK num="0" complete="0" server="phoenix.engr.ucdavis.edu:5050">
</TASK>
<AGENT_CODE>
<![CDATA[
#include <stdio.h>
#include <fipa_acl.h>
int allowable_code_size = 4000;
int main() {
fipa_acl_message_p acl;
fipa_acl_message_p reply;
int requested_code_size;
printf("\nAllowable code size on '%s' : '%d' bytes.\n",
"phoenix.engr.ucdavis.edu:5050",
allowable_code_size);
while((acl = mc_AclWaitRetrieve(mc_current_agent))) {
reply = mc_AclReply(acl);
mc_AclSetPerformative(reply, FIPA_ERROR);
switch(acl->performative) {
case FIPA_REQUEST: {
printf("\nRequest from Agent '%s' : '%s' bytes\n",
acl->sender->name, acl->content->content);
requested_code_size = atoi(acl->content->content);
if(requested_code_size <= allowable_code_size) {
mc_AclSetPerformative(reply, FIPA_AGREE);
printf("\nRespond to Agent '%s' : 'AGREE'\n",
acl->sender->name);
}
else {
mc_AclSetPerformative(reply, FIPA_REFUSE);
printf("\nRespond to Agent '%s' : 'REFUSE'\n",
acl->sender->name);
}
mc_AclDestroy(acl);
break;
}
default:
mc_AclDestroy(acl);
break;
}
mc_AclSetSender(reply, "regulating_agent3", "http://phoenix.engr.ucdavis.edu:5050/acc");
mc_AclSend(reply);
mc_AclDestroy(reply);
}
return 0;
}
]]>
</AGENT_CODE>
</TASKS>
</AGENT_DATA>
</MOBILE_AGENT>
</MESSAGE>
</MOBILEC_MESSAGE>
|
| Integration Engineering Laboratory | UCD MTU Sandia |