Example: Autonomic Parallel Matrix Multiplication
regulating_agent4.xml
<?xml version="1.0"?> <!DOCTYPE myMessage SYSTEM "mobilec.dtd"> <MOBILEC_MESSAGE> <MESSAGE message="MOBILE_AGENT"> <MOBILE_AGENT> <AGENT_DATA> <NAME>regulating_agent4</NAME> <OWNER>IEL</OWNER> <HOME>shrimp.engr.ucdavis.edu:5050</HOME> <TASKS task="1" num="0"> <TASK num="0" complete="0" server="shrimp.engr.ucdavis.edu:5050"> </TASK> <AGENT_CODE> <![CDATA[ #include <stdio.h> #include <sys/stat.h> #include <fipa_acl.h> #define NUM 4 #define LEN 50 char community[NUM][LEN] = { "shrimp.engr.ucdavis.edu:5050", "bird2.engr.ucdavis.edu:5050", "ch.engr.ucdavis.edu:5050", "phoenix.engr.ucdavis.edu:5050" }; int main() { int i; int j; int m; int n; struct stat buffer; char ally[NUM][LEN]; char taskcode_path[] = "task/mm.c"; int taskcode_size; int envconfigcode_size = 300; int agentcode_size; char agentcode_size_str[50]; char *taskcode; char *agentcode; char config_str[50]; char mpi_envconfig[300]; char mpi_root_host[] = "shrimp.engr.ucdavis.edu"; int mpi_root_port = 5130; char taskagent_name[50]; char taskagent_name_tmp[] = "task_agent"; fipa_acl_message_p acl; char local_aclsender_name[] = "regulating_agent4"; char local_aclsender_address[] = "http://shrimp.engr.ucdavis.edu:5050/acc"; char aclreceiver_name[50]; char aclreceiver_address[50]; char aclreceiver_name_tmp[] = "regulating_agent"; char aclsender_address[50]; FILE *fptr; char performancecode_path[] = "task/flops3.c"; int performancecode_size; int aclconfigcode_size = 200; int agentcode2_size; char *performancecode; char *agentcode2; char config_str2[60]; char performanceagent_name[50]; char performanceagent_name_tmp[] = "performance_agent"; double sum; double performance_index[NUM-1]; double load_proration[NUM-1]; char load_config[50]; char load_config_tmp[5]; MCAgent_t agent; for(i=0; i<NUM; i++) { strcpy(ally[i], community[i]); } stat(taskcode_path, &buffer); taskcode_size = buffer.st_size; agentcode_size = taskcode_size + envconfigcode_size; sprintf(agentcode_size_str, "%d", agentcode_size); for(i=1; i<NUM; i++) { sprintf(aclreceiver_name, "%s%d", aclreceiver_name_tmp, i); sprintf(aclreceiver_address, "http://%s/acc", ally[i]); printf("\nRequest to Agent '%s' : '%d' bytes\n", aclreceiver_name, agentcode_size); acl = mc_AclNew(); mc_AclSetPerformative( acl, FIPA_REQUEST); mc_AclSetSender( acl, local_aclsender_name, local_aclsender_address); mc_AclAddReceiver( acl, aclreceiver_name, aclreceiver_address); mc_AclSetContent( acl, agentcode_size_str); mc_AclSend(acl); mc_AclDestroy(acl); } for(i=1; i<NUM; i++) { memset(ally[i], 0, LEN); } i = 1; m = 1; n = 0; while((acl = mc_AclWaitRetrieve(mc_current_agent))) { switch(acl->performative) { case FIPA_AGREE: { printf("\nRespond from Agent '%s' : 'AGREE'\n", acl->sender->name); strcpy(aclsender_address, acl->sender->addresses->urls[0]->str); strncpy(ally[m], aclsender_address+7, strstr(aclsender_address+7, "/")-(aclsender_address+7)); m++; n++; mc_AclDestroy(acl); break; } case FIPA_REFUSE: { printf("\nRespond from Agent '%s' : 'REFUSE'\n", acl->sender->name); mc_AclDestroy(acl); break; } default: mc_AclDestroy(acl); break; } i++; if(i == NUM) { break; } } stat(performancecode_path, &buffer); performancecode_size = buffer.st_size; agentcode2_size = performancecode_size + aclconfigcode_size; performancecode = (char *)malloc((performancecode_size+1)*sizeof(char)); memset(performancecode, 0, performancecode_size+1); fptr = fopen(performancecode_path, "r"); fread(performancecode, sizeof(char), performancecode_size, fptr); fclose(fptr); agentcode2 = (char *)malloc((agentcode2_size+1)*sizeof(char)); for(i=1; i<m; i++) { sprintf(config_str2, "char s_n[]=\"%s%d\";\n", performanceagent_name_tmp, i+1); strcpy(agentcode2, config_str2); sprintf(config_str2, "char s_a[]=\"http://%s/acc\";\n", ally[i]); strcat(agentcode2, config_str2); strcat(agentcode2, performancecode); sprintf(performanceagent_name, "%s%d", performanceagent_name_tmp, i); //printf("%s", agentcode2); agent = mc_ComposeAgent( performanceagent_name, ally[0], "IEL", agentcode2, "no-return", ally[i], 0); mc_AddAgent(agent); printf("\nSend '%s' to : '%s'\n", performanceagent_name, ally[i]); } free(performancecode); free(agentcode2); for(i=1; i<NUM; i++) { memset(ally[i], 0, LEN); } i = 1; j = 1; n = 0; while((acl = mc_AclWaitRetrieve(mc_current_agent))) { switch(acl->performative) { case FIPA_INFORM: { printf("\nRespond from Agent '%s' : '%s' (Performance Index)\n", acl->sender->name, acl->content->content); strcpy(aclsender_address, acl->sender->addresses->urls[0]->str); strncpy(ally[j], aclsender_address+7, strstr(aclsender_address+7, "/")-(aclsender_address+7)); performance_index[n] = atof(acl->content->content); j++; n++; mc_AclDestroy(acl); break; } default: mc_AclDestroy(acl); break; } i++; if(i == m) { break; } } for(i=0; i<n; i++) { sum += performance_index[i]; } for(i=0; i<n; i++) { load_proration[i] = performance_index[i]/sum; //printf("load_proration[%d] = %f\n", i, load_proration[i]); } sprintf(load_config, "double load_proration[%d]={", n); for(i=0; i<(n-1); i++) { sprintf(load_config_tmp, "%.1f,", load_proration[i]); strcat(load_config, load_config_tmp); } sprintf(load_config_tmp, "%.1f};\n", load_proration[i]); strcat(load_config, load_config_tmp); //printf("%s\n", load_config); taskcode = (char *)malloc((taskcode_size+1)*sizeof(char)); memset(taskcode, 0, taskcode_size+1); fptr = fopen(taskcode_path, "r"); fread(taskcode, sizeof(char), taskcode_size, fptr); fclose(fptr); sprintf(config_str, "putenv(\"PMI_ROOT_HOST=%s\");\n", mpi_root_host); strcpy(mpi_envconfig, config_str); sprintf(config_str, "putenv(\"PMI_ROOT_PORT=%d\");\n", mpi_root_port); strcat(mpi_envconfig, config_str); sprintf(config_str, "putenv(\"PMI_ROOT_LOCAL=1\");\n"); strcat(mpi_envconfig, config_str); sprintf(config_str, "putenv(\"PMI_SIZE=%d\");\n", m); strcat(mpi_envconfig, config_str); agentcode = (char *)malloc((agentcode_size+1)*sizeof(char)); for(i=0; i<m; i++) { strcpy(agentcode, mpi_envconfig); sprintf(config_str, "putenv(\"PMI_RANK=%d\");\n", i); strcat(agentcode, config_str); strcat(agentcode, load_config); strcat(agentcode, taskcode); sprintf(taskagent_name, "%s%d", taskagent_name_tmp, i+1); //printf("%s", agentcode); agent = mc_ComposeAgent( taskagent_name, ally[0], "IEL", agentcode, "no-return", ally[i], 0); mc_AddAgent(agent); if(i == 0) { printf("\nSend '%s' to : '%s' (head machine, local)\n", taskagent_name, ally[i]); usleep(1000000); } else { printf("\nSend '%s' to : '%s'\n", taskagent_name, ally[i]); } } free(agentcode); free(taskcode); return 0; } ]]> </AGENT_CODE> </TASKS> </AGENT_DATA> </MOBILE_AGENT> </MESSAGE> </MOBILEC_MESSAGE>
regulating_agent1.xml
<?xml version="1.0"?> <!DOCTYPE myMessage SYSTEM "mobilec.dtd"> <MOBILEC_MESSAGE> <MESSAGE message="MOBILE_AGENT"> <MOBILE_AGENT> <AGENT_DATA> <NAME>regulating_agent1</NAME> <OWNER>IEL</OWNER> <HOME>bird2.engr.ucdavis.edu:5050</HOME> <TASKS task="1" num="0"> <TASK num="0" complete="0" server="bird2.engr.ucdavis.edu:5050"> </TASK> <AGENT_CODE> <![CDATA[ #include <stdio.h> #include <fipa_acl.h> int allowable_code_size = 6000; int main() { fipa_acl_message_p acl; fipa_acl_message_p reply; int requested_code_size; printf("\nAllowable code size on '%s' : '%d' bytes.\n", "bird2.engr.ucdavis.edu:5050", allowable_code_size); while((acl = mc_AclWaitRetrieve(mc_current_agent))) { reply = mc_AclReply(acl); mc_AclSetPerformative(reply, FIPA_ERROR); switch(acl->performative) { case FIPA_REQUEST: { printf("\nRequest from Agent '%s' : '%s' bytes\n", acl->sender->name, acl->content->content); requested_code_size = atoi(acl->content->content); if(requested_code_size <= allowable_code_size) { mc_AclSetPerformative(reply, FIPA_AGREE); printf("\nRespond to Agent '%s' : 'AGREE'\n", acl->sender->name); } else { mc_AclSetPerformative(reply, FIPA_REFUSE); printf("\nRespond to Agent '%s' : 'REFUSE'\n", acl->sender->name); } mc_AclDestroy(acl); break; } default: mc_AclDestroy(acl); break; } mc_AclSetSender(reply, "regulating_agent1", "http://bird2.engr.ucdavis.edu:5050/acc"); mc_AclSend(reply); mc_AclDestroy(reply); } return 0; } ]]> </AGENT_CODE> </TASKS> </AGENT_DATA> </MOBILE_AGENT> </MESSAGE> </MOBILEC_MESSAGE>
regulating_agent2.xml
<?xml version="1.0"?> <!DOCTYPE myMessage SYSTEM "mobilec.dtd"> <MOBILEC_MESSAGE> <MESSAGE message="MOBILE_AGENT"> <MOBILE_AGENT> <AGENT_DATA> <NAME>regulating_agent2</NAME> <OWNER>IEL</OWNER> <HOME>ch.engr.ucdavis.edu:5050</HOME> <TASKS task="1" num="0"> <TASK num="0" complete="0" server="ch.engr.ucdavis.edu:5050"> </TASK> <AGENT_CODE> <![CDATA[ #include <stdio.h> #include <fipa_acl.h> int allowable_code_size = 5000; int main() { fipa_acl_message_p acl; fipa_acl_message_p reply; int requested_code_size; printf("\nAllowable code size on '%s' : '%d' bytes.\n", "ch.engr.ucdavis.edu:5050", allowable_code_size); while((acl = mc_AclWaitRetrieve(mc_current_agent))) { reply = mc_AclReply(acl); mc_AclSetPerformative(reply, FIPA_ERROR); switch(acl->performative) { case FIPA_REQUEST: { printf("\nRequest from Agent '%s' : '%s' bytes\n", acl->sender->name, acl->content->content); requested_code_size = atoi(acl->content->content); if(requested_code_size <= allowable_code_size) { mc_AclSetPerformative(reply, FIPA_AGREE); printf("\nRespond to Agent '%s' : 'AGREE'\n", acl->sender->name); } else { mc_AclSetPerformative(reply, FIPA_REFUSE); printf("\nRespond to Agent '%s' : 'REFUSE'\n", acl->sender->name); } mc_AclDestroy(acl); break; } default: mc_AclDestroy(acl); break; } mc_AclSetSender(reply, "regulating_agent2", "http://ch.engr.ucdavis.edu:5050/acc"); mc_AclSend(reply); mc_AclDestroy(reply); } return 0; } ]]> </AGENT_CODE> </TASKS> </AGENT_DATA> </MOBILE_AGENT> </MESSAGE> </MOBILEC_MESSAGE>
regulating_agent3.xml
<?xml version="1.0"?> <!DOCTYPE myMessage SYSTEM "mobilec.dtd"> <MOBILEC_MESSAGE> <MESSAGE message="MOBILE_AGENT"> <MOBILE_AGENT> <AGENT_DATA> <NAME>regulating_agent3</NAME> <OWNER>IEL</OWNER> <HOME>phoenix.engr.ucdavis.edu:5050</HOME> <TASKS task="1" num="0"> <TASK num="0" complete="0" server="phoenix.engr.ucdavis.edu:5050"> </TASK> <AGENT_CODE> <![CDATA[ #include <stdio.h> #include <fipa_acl.h> int allowable_code_size = 4000; int main() { fipa_acl_message_p acl; fipa_acl_message_p reply; int requested_code_size; printf("\nAllowable code size on '%s' : '%d' bytes.\n", "phoenix.engr.ucdavis.edu:5050", allowable_code_size); while((acl = mc_AclWaitRetrieve(mc_current_agent))) { reply = mc_AclReply(acl); mc_AclSetPerformative(reply, FIPA_ERROR); switch(acl->performative) { case FIPA_REQUEST: { printf("\nRequest from Agent '%s' : '%s' bytes\n", acl->sender->name, acl->content->content); requested_code_size = atoi(acl->content->content); if(requested_code_size <= allowable_code_size) { mc_AclSetPerformative(reply, FIPA_AGREE); printf("\nRespond to Agent '%s' : 'AGREE'\n", acl->sender->name); } else { mc_AclSetPerformative(reply, FIPA_REFUSE); printf("\nRespond to Agent '%s' : 'REFUSE'\n", acl->sender->name); } mc_AclDestroy(acl); break; } default: mc_AclDestroy(acl); break; } mc_AclSetSender(reply, "regulating_agent3", "http://phoenix.engr.ucdavis.edu:5050/acc"); mc_AclSend(reply); mc_AclDestroy(reply); } return 0; } ]]> </AGENT_CODE> </TASKS> </AGENT_DATA> </MOBILE_AGENT> </MESSAGE> </MOBILEC_MESSAGE> |
Integration Engineering Laboratory | UCD MTU Sandia |