00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _WIN32
00035 #include "config.h"
00036 #else
00037 #include "winconfig.h"
00038 #endif
00039
00040 #include "include/ams.h"
00041 #include "include/agent.h"
00042 #include "include/data_structures.h"
00043 #include "include/mc_platform.h"
00044
00045 int
00046 ams_Destroy(ams_p ams)
00047 {
00048 MUTEX_DESTROY(ams->runflag_lock);
00049 free(ams->runflag_lock);
00050 COND_DESTROY(ams->runflag_cond);
00051 free(ams->runflag_cond);
00052 free(ams);
00053 return MC_SUCCESS;
00054 }
00055
00056 ams_p
00057 ams_Initialize(mc_platform_p mc_platform)
00058 {
00059 ams_p ams;
00060 ams = (ams_p)malloc(sizeof(ams_t));
00061 CHECK_NULL(ams, exit(0););
00062 ams->mc_platform = mc_platform;
00063
00064 ams->runflag_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00065 CHECK_NULL(ams->runflag_lock, exit(0););
00066 MUTEX_INIT(ams->runflag_lock);
00067
00068 ams->runflag_cond = (COND_T*)malloc(sizeof(COND_T));
00069 CHECK_NULL(ams->runflag_cond, exit(0););
00070 COND_INIT(ams->runflag_cond);
00071
00072 ams->run = 0;
00073
00074 ams->waiting = 0;
00075 ams->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00076 MUTEX_INIT(ams->waiting_lock);
00077 ams->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00078 COND_INIT(ams->waiting_cond);
00079
00080 return ams;
00081 }
00082
00083 void
00084 ams_Print(ams_p ams)
00085 {
00086 int i;
00087 MCAgent_t agent;
00088 agent_queue_p alist;
00089
00090 alist = ams->mc_platform->agent_queue;
00091
00092 MUTEX_LOCK(alist->lock);
00093
00094 if(alist->size == 0)
00095 {
00096 MUTEX_UNLOCK(alist->lock);
00097 return;
00098 }
00099
00100
00101 printf("%d total agents on board.\n", alist->size);
00102 for(i=0; i<alist->size; i++)
00103 {
00104 agent = (MCAgent_t)ListSearch(alist->list, i);
00105 printf("Agent id: %lu, Connect id: %lu, status: %u\n",
00106 agent->id,
00107 agent->connect_id,
00108 agent->agent_status);
00109 }
00110
00111 MUTEX_UNLOCK(alist->lock);
00112 return;
00113 }
00114
00115 extern int
00116 ams_ManageAgentList(ams_p ams)
00117 {
00118
00119 MCAgent_t current_agent;
00120 int index = 0;
00121 agent_queue_p alist;
00122 mc_platform_p global;
00123 message_p message;
00124
00125 alist = ams->mc_platform->agent_queue;
00126 global = ams->mc_platform;
00127
00128
00129
00130 MUTEX_LOCK(alist->lock);
00131 for(index=0; index<alist->size; index++)
00132 {
00133 if((current_agent = (MCAgent_t)ListSearch(alist->list, index)))
00134 {
00135 if(current_agent->binary) {continue;}
00136 MUTEX_UNLOCK(alist->lock);
00137 MUTEX_LOCK(current_agent->lock);
00138 current_agent->orphan = 0;
00139 MUTEX_LOCK(global->quit_lock);
00140 if(global->quit && current_agent->agent_status != MC_WAIT_MESSGSEND) {
00141 MUTEX_UNLOCK(global->quit_lock);
00142 MUTEX_UNLOCK(current_agent->lock);
00143 MC_TerminateAgent(current_agent);
00144
00145 MUTEX_LOCK(current_agent->run_lock);
00146 MUTEX_UNLOCK(current_agent->run_lock);
00147 continue;
00148 }
00149 MUTEX_UNLOCK(global->quit_lock);
00150 switch(current_agent->agent_status)
00151 {
00152 case MC_WAIT_CH :
00153 MUTEX_UNLOCK(current_agent->lock);
00154 agent_RunChScript(current_agent, global);
00155 break;
00156 case MC_AGENT_ACTIVE :
00157 MUTEX_UNLOCK(current_agent->lock);
00158
00159 break;
00160 case MC_WAIT_MESSGSEND :
00161 current_agent->agent_status = MC_WAIT_FINISHED;
00162 MUTEX_UNLOCK(current_agent->lock);
00163 MUTEX_LOCK(ams->runflag_lock);
00164 ams->run = 1;
00165 MUTEX_UNLOCK(ams->runflag_lock);
00166 MUTEX_UNLOCK(current_agent->lock);
00167 message = message_New();
00168 if (
00169 message_InitializeFromAgent
00170 (
00171 ams->mc_platform,
00172 message,
00173 current_agent
00174 )
00175 )
00176 {
00177 fprintf(stderr, "Error initializing message from agent. %s:%d\n", __FILE__, __LINE__);
00178 message_Destroy(message);
00179 message = NULL;
00180 } else {
00181
00182
00183 current_agent->name = (char*)realloc(
00184 current_agent->name,
00185 sizeof(char) * (strlen(current_agent->name) + 10)
00186 );
00187 strcat(current_agent->name, "_SENDING");
00188 message_queue_Add(
00189 ams->mc_platform->message_queue,
00190 message
00191 );
00192 }
00193 break;
00194 case MC_AGENT_NEUTRAL :
00195 MUTEX_UNLOCK(current_agent->lock);
00196 break;
00197 case MC_WAIT_FINISHED :
00198 MUTEX_UNLOCK(current_agent->lock);
00199 agent_queue_RemoveIndex(alist, index);
00200
00201
00202
00203 index--;
00204 break;
00205 default :
00206 printf("ERROR IN AGENT FORMAT");
00207 printf("Agent Format %d not recognized.",
00208 current_agent->agent_status);
00209
00210 current_agent->agent_status = MC_WAIT_FINISHED;
00211 MUTEX_UNLOCK(current_agent->lock);
00212 }
00213 } else {
00214 MUTEX_UNLOCK( alist->lock );
00215 }
00216 MUTEX_LOCK( alist->lock );
00217 }
00218 MUTEX_UNLOCK( alist->lock );
00219 return 0 ;
00220 }
00221
00222 void
00223 ams_Start(mc_platform_p mc_platform)
00224 {
00225 ams_p ams = mc_platform->ams;
00226 #ifndef _WIN32
00227 pthread_attr_t attr;
00228 pthread_attr_init(&attr);
00229 if(mc_platform->stack_size[MC_THREAD_AMS] != -1) {
00230 pthread_attr_setstacksize
00231 (
00232 &attr,
00233 mc_platform->stack_size[MC_THREAD_AMS]
00234 );
00235 }
00236 #else
00237 int stack_size;
00238 if (mc_platform->stack_size[MC_THREAD_AMS] < 1) {
00239
00240 stack_size = mc_platform->stack_size[MC_THREAD_AMS]+1;
00241 } else {
00242 stack_size = mc_platform->stack_size[MC_THREAD_AMS];
00243 }
00244 #endif
00245 THREAD_CREATE
00246 (
00247 &ams->thread,
00248 ams_Thread,
00249 mc_platform
00250 );
00251 }
00252 #ifndef _WIN32
00253 void*
00254 ams_Thread(void* arg)
00255 #else
00256 DWORD WINAPI
00257 ams_Thread( LPVOID arg )
00258 #endif
00259 {
00260 mc_platform_p mc_platform = (mc_platform_p)arg;
00261 ams_p ams = mc_platform->ams;
00262 int ams_thread_count = 0;
00263 while(1) {
00264 MUTEX_LOCK(ams->runflag_lock);
00265 MUTEX_LOCK(mc_platform->quit_lock);
00266 while(ams->run == 0 && !mc_platform->quit) {
00267 MUTEX_UNLOCK(mc_platform->quit_lock);
00268
00269 MUTEX_LOCK(ams->waiting_lock);
00270 ams->waiting = 1;
00271 COND_BROADCAST(ams->waiting_cond);
00272 MUTEX_UNLOCK(ams->waiting_lock);
00273
00274 COND_WAIT
00275 (
00276 ams->runflag_cond,
00277 ams->runflag_lock
00278 );
00279 MUTEX_LOCK(mc_platform->quit_lock);
00280 }
00281
00282 MUTEX_LOCK(ams->waiting_lock);
00283 ams->waiting = 0;
00284 COND_BROADCAST(ams->waiting_cond);
00285 MUTEX_UNLOCK(ams->waiting_lock);
00286 if (ams->run == 0 && mc_platform->quit) {
00287 MUTEX_UNLOCK(mc_platform->quit_lock);
00288 MUTEX_UNLOCK(ams->runflag_lock);
00289 ams_ManageAgentList(ams);
00290 THREAD_EXIT();
00291 }
00292 ams->run = 0;
00293 MUTEX_UNLOCK(mc_platform->quit_lock);
00294 MUTEX_UNLOCK(ams->runflag_lock);
00295 ams_ManageAgentList(ams);
00296 ams_thread_count++;
00297 }
00298 THREAD_EXIT();
00299 }