00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _WIN32
00035 #include "config.h"
00036 #else
00037 #include "winconfig.h"
00038 #endif
00039
00040 #include "include/ams.h"
00041 #include "include/agent.h"
00042 #include "include/data_structures.h"
00043 #include "include/mc_platform.h"
00044
00045 int
00046 ams_Destroy(ams_p ams)
00047 {
00048 MUTEX_DESTROY(ams->runflag_lock);
00049 free(ams->runflag_lock);
00050 COND_DESTROY(ams->runflag_cond);
00051 free(ams->runflag_cond);
00052 MUTEX_DESTROY(ams->waiting_lock);
00053 free(ams->waiting_lock);
00054 COND_DESTROY(ams->waiting_cond);
00055 free(ams->waiting_cond);
00056 free(ams);
00057 return MC_SUCCESS;
00058 }
00059
00060 ams_p
00061 ams_Initialize(mc_platform_p mc_platform)
00062 {
00063 ams_p ams;
00064 ams = (ams_p)malloc(sizeof(ams_t));
00065 CHECK_NULL(ams, exit(0););
00066 ams->mc_platform = mc_platform;
00067
00068 ams->runflag_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00069 CHECK_NULL(ams->runflag_lock, exit(0););
00070 MUTEX_INIT(ams->runflag_lock);
00071
00072 ams->runflag_cond = (COND_T*)malloc(sizeof(COND_T));
00073 CHECK_NULL(ams->runflag_cond, exit(0););
00074 COND_INIT(ams->runflag_cond);
00075
00076 ams->run = 0;
00077
00078 ams->waiting = 0;
00079 ams->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00080 MUTEX_INIT(ams->waiting_lock);
00081 ams->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00082 COND_INIT(ams->waiting_cond);
00083
00084 return ams;
00085 }
00086
00087 void
00088 ams_Print(ams_p ams)
00089 {
00090 int i;
00091 MCAgent_t agent;
00092 agent_queue_p alist;
00093
00094 alist = ams->mc_platform->agent_queue;
00095
00096 MUTEX_LOCK(alist->lock);
00097
00098 if(alist->size == 0)
00099 {
00100 MUTEX_UNLOCK(alist->lock);
00101 return;
00102 }
00103
00104
00105 printf("%d total agents on board.\n", alist->size);
00106 for(i=0; i<alist->size; i++)
00107 {
00108 agent = (MCAgent_t)ListSearch(alist->list, i);
00109 printf("Agent id: %lu, Connect id: %lu, status: %u\n",
00110 agent->id,
00111 agent->connect_id,
00112 agent->agent_status);
00113 }
00114
00115 MUTEX_UNLOCK(alist->lock);
00116 return;
00117 }
00118
00119 extern int
00120 ams_ManageAgentList(ams_p ams)
00121 {
00122
00123 MCAgent_t current_agent;
00124 int index = 0;
00125 agent_queue_p alist;
00126 mc_platform_p global;
00127 message_p message;
00128
00129 alist = ams->mc_platform->agent_queue;
00130 global = ams->mc_platform;
00131
00132
00133
00134 MUTEX_LOCK(alist->lock);
00135 for(index=0; index<alist->size; index++)
00136 {
00137 if((current_agent = (MCAgent_t)ListSearch(alist->list, index)))
00138 {
00139 if(current_agent->binary) {continue;}
00140 MUTEX_UNLOCK(alist->lock);
00141 MUTEX_LOCK(current_agent->lock);
00142 current_agent->orphan = 0;
00143 MUTEX_LOCK(global->quit_lock);
00144 if(global->quit && current_agent->agent_status != MC_WAIT_MESSGSEND) {
00145 MUTEX_UNLOCK(global->quit_lock);
00146 MUTEX_UNLOCK(current_agent->lock);
00147 MC_TerminateAgent(current_agent);
00148
00149 MUTEX_LOCK(current_agent->run_lock);
00150 MUTEX_UNLOCK(current_agent->run_lock);
00151 continue;
00152 }
00153 MUTEX_UNLOCK(global->quit_lock);
00154 switch(current_agent->agent_status)
00155 {
00156 case MC_WAIT_CH :
00157 MUTEX_UNLOCK(current_agent->lock);
00158 agent_RunChScript(current_agent, global);
00159 break;
00160 case MC_AGENT_ACTIVE :
00161 MUTEX_UNLOCK(current_agent->lock);
00162
00163 break;
00164 case MC_WAIT_MESSGSEND :
00165 current_agent->agent_status = MC_WAIT_FINISHED;
00166 MUTEX_UNLOCK(current_agent->lock);
00167 MUTEX_LOCK(ams->runflag_lock);
00168 ams->run = 1;
00169 MUTEX_UNLOCK(ams->runflag_lock);
00170 MUTEX_UNLOCK(current_agent->lock);
00171 message = message_New();
00172 if (
00173 message_InitializeFromAgent
00174 (
00175 ams->mc_platform,
00176 message,
00177 current_agent
00178 )
00179 )
00180 {
00181 fprintf(stderr, "Error initializing message from agent. %s:%d\n", __FILE__, __LINE__);
00182 message_Destroy(message);
00183 message = NULL;
00184 } else {
00185
00186
00187 current_agent->name = (char*)realloc(
00188 current_agent->name,
00189 sizeof(char) * (strlen(current_agent->name) + 10)
00190 );
00191 strcat(current_agent->name, "_SENDING");
00192 message_queue_Add(
00193 ams->mc_platform->message_queue,
00194 message
00195 );
00196 }
00197 break;
00198 case MC_AGENT_NEUTRAL :
00199 MUTEX_UNLOCK(current_agent->lock);
00200 break;
00201 case MC_WAIT_FINISHED :
00202 MUTEX_UNLOCK(current_agent->lock);
00203 agent_queue_RemoveIndex(alist, index);
00204
00205
00206
00207 index--;
00208 break;
00209 default :
00210 printf("ERROR IN AGENT FORMAT");
00211 printf("Agent Format %d not recognized.",
00212 current_agent->agent_status);
00213
00214 current_agent->agent_status = MC_WAIT_FINISHED;
00215 MUTEX_UNLOCK(current_agent->lock);
00216 }
00217 } else {
00218 MUTEX_UNLOCK( alist->lock );
00219 }
00220 MUTEX_LOCK( alist->lock );
00221 }
00222 MUTEX_UNLOCK( alist->lock );
00223 return 0 ;
00224 }
00225
00226 void
00227 ams_Start(mc_platform_p mc_platform)
00228 {
00229 ams_p ams = mc_platform->ams;
00230 #ifndef _WIN32
00231 pthread_attr_t attr;
00232 pthread_attr_init(&attr);
00233 if(mc_platform->stack_size[MC_THREAD_AMS] != -1) {
00234 pthread_attr_setstacksize
00235 (
00236 &attr,
00237 mc_platform->stack_size[MC_THREAD_AMS]
00238 );
00239 }
00240 #else
00241 int stack_size;
00242 if (mc_platform->stack_size[MC_THREAD_AMS] < 1) {
00243
00244 stack_size = mc_platform->stack_size[MC_THREAD_AMS]+1;
00245 } else {
00246 stack_size = mc_platform->stack_size[MC_THREAD_AMS];
00247 }
00248 #endif
00249 THREAD_CREATE
00250 (
00251 &ams->thread,
00252 ams_Thread,
00253 mc_platform
00254 );
00255 }
00256 #ifndef _WIN32
00257 void*
00258 ams_Thread(void* arg)
00259 #else
00260 DWORD WINAPI
00261 ams_Thread( LPVOID arg )
00262 #endif
00263 {
00264 mc_platform_p mc_platform = (mc_platform_p)arg;
00265 ams_p ams = mc_platform->ams;
00266 int ams_thread_count = 0;
00267 while(1) {
00268 MUTEX_LOCK(ams->runflag_lock);
00269 MUTEX_LOCK(mc_platform->quit_lock);
00270 while(ams->run == 0 && !mc_platform->quit) {
00271 MUTEX_UNLOCK(mc_platform->quit_lock);
00272
00273 MUTEX_LOCK(ams->waiting_lock);
00274 ams->waiting = 1;
00275 COND_BROADCAST(ams->waiting_cond);
00276 MUTEX_UNLOCK(ams->waiting_lock);
00277
00278 COND_WAIT
00279 (
00280 ams->runflag_cond,
00281 ams->runflag_lock
00282 );
00283 MUTEX_LOCK(mc_platform->quit_lock);
00284 }
00285
00286 MUTEX_LOCK(ams->waiting_lock);
00287 ams->waiting = 0;
00288 COND_BROADCAST(ams->waiting_cond);
00289 MUTEX_UNLOCK(ams->waiting_lock);
00290 if (ams->run == 0 && mc_platform->quit) {
00291 MUTEX_UNLOCK(mc_platform->quit_lock);
00292 MUTEX_UNLOCK(ams->runflag_lock);
00293 ams_ManageAgentList(ams);
00294 THREAD_EXIT();
00295 }
00296 ams->run = 0;
00297 MUTEX_UNLOCK(mc_platform->quit_lock);
00298 MUTEX_UNLOCK(ams->runflag_lock);
00299 ams_ManageAgentList(ams);
00300 ams_thread_count++;
00301 }
00302 THREAD_EXIT();
00303 }