00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _WIN32
00033 #include <sys/socket.h>
00034 #include <arpa/inet.h>
00035 #include <netinet/in.h>
00036 #include <netdb.h>
00037 #include <sys/un.h>
00038 #include <unistd.h>
00039 #include <sys/time.h>
00040 #include <netdb.h>
00041 #include <pthread.h>
00042 #else
00043 #include <winsock.h>
00044 #include <windows.h>
00045 #include <time.h>
00046 #endif
00047
00048 #include <stdlib.h>
00049 #include "include/acc.h"
00050 #include "include/connection.h"
00051 #include "include/data_structures.h"
00052 #include "include/macros.h"
00053 #include "include/mc_error.h"
00054 #include "include/mc_platform.h"
00055 #include "include/message.h"
00056 #include "include/mtp_http.h"
00057 #include "include/xml_parser.h"
00058 #include "include/fipa_acl_envelope.h"
00059
00060 #define BACKLOG 10
00061
00062 acc_p
00063 acc_Initialize(struct mc_platform_s* mc_platform)
00064 {
00065 acc_p acc;
00066 acc = (acc_p)malloc(sizeof(acc_t));
00067 acc->mc_platform = mc_platform;
00068
00069 acc->waiting = 0;
00070 acc->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00071 MUTEX_INIT(acc->waiting_lock);
00072 acc->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00073 COND_INIT(acc->waiting_cond);
00074
00075 return acc;
00076 }
00077
00078 int
00079 acc_Destroy(acc_p acc)
00080 {
00081 if(acc == NULL) {
00082 return MC_SUCCESS;
00083 }
00084 free(acc);
00085 acc = NULL;
00086 return MC_SUCCESS;
00087 }
00088
00089 #ifndef _WIN32
00090 void*
00091 acc_MessageHandlerThread(void* arg)
00092 #else
00093 DWORD WINAPI
00094 acc_MessageHandlerThread( LPVOID arg )
00095 #endif
00096 {
00097 mc_platform_p mc_platform = (mc_platform_p)arg;
00098 message_p message;
00099 agent_p agent;
00100 int mobile_agent_counter = 1;
00101 char* tmpstr;
00102 char* origname;
00103 int i;
00104
00105 while(1)
00106 {
00107 MUTEX_LOCK(mc_platform->message_queue->lock);
00108 MUTEX_LOCK(mc_platform->quit_lock);
00109 while(mc_platform->message_queue->size == 0 && !mc_platform->quit) {
00110 MUTEX_UNLOCK(mc_platform->quit_lock);
00111 COND_WAIT
00112 (
00113 mc_platform->message_queue->cond,
00114 mc_platform->message_queue->lock
00115 );
00116 MUTEX_LOCK(mc_platform->quit_lock);
00117 }
00118 if ( mc_platform->message_queue->size == 0 && mc_platform->quit )
00119 {
00120 MUTEX_UNLOCK(mc_platform->quit_lock);
00121 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00122 return 0;
00123 }
00124
00125 MUTEX_UNLOCK(mc_platform->quit_lock);
00126 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00127 message = message_queue_Pop(mc_platform->message_queue);
00128 if (message == NULL) {
00129 printf("POP ERROR\n");
00130 continue;
00131 }
00132
00133 MUTEX_LOCK(mc_platform->MC_signal_lock);
00134 mc_platform->MC_signal = MC_RECV_MESSAGE;
00135 COND_BROADCAST(mc_platform->MC_signal_cond);
00136 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00137 MUTEX_LOCK(mc_platform->giant_lock);
00138 while(mc_platform->giant == 0) {
00139 COND_WAIT
00140 (
00141 mc_platform->giant_cond,
00142 mc_platform->giant_lock
00143 );
00144 }
00145 MUTEX_UNLOCK(mc_platform->giant_lock);
00146
00147
00148 if( message->to_address == NULL) {
00149
00150
00151 switch(message->message_type) {
00152 case MOBILE_AGENT:
00153 agent = agent_Initialize
00154 (
00155 mc_platform,
00156 message,
00157 mobile_agent_counter
00158 );
00159 if (agent != NULL) {
00160
00161 i = 1;
00162 if( agent_queue_SearchName( mc_platform->agent_queue, agent->name ) ) {
00163 origname = agent->name;
00164 while( agent_queue_SearchName( mc_platform->agent_queue, agent->name ) ) {
00165
00166
00167 tmpstr = (char*)malloc(sizeof(char)* strlen(origname)+5);
00168 sprintf(tmpstr, "%s_%04d", origname, i);
00169 agent->name = tmpstr;
00170 i++;
00171 }
00172 fprintf(stderr, "Warning: Agent '%s' has been renamed to '%s'.\n",
00173 origname, agent->name);
00174 free(origname);
00175 }
00176 mobile_agent_counter++;
00177 agent_queue_Add
00178 (
00179 mc_platform->agent_queue,
00180 agent
00181 );
00182 }
00183 message_Destroy(message);
00184
00185 MUTEX_LOCK(mc_platform->MC_signal_lock);
00186 mc_platform->MC_signal = MC_RECV_AGENT;
00187 COND_BROADCAST(mc_platform->MC_signal_cond);
00188 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00189 MUTEX_LOCK(mc_platform->giant_lock);
00190 while(mc_platform->giant == 0) {
00191 COND_WAIT(
00192 mc_platform->giant_cond,
00193 mc_platform->giant_lock
00194 );
00195 }
00196 MUTEX_UNLOCK(mc_platform->giant_lock);
00197
00198 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00199 mc_platform->ams->run = 1;
00200 COND_BROADCAST(mc_platform->ams->runflag_cond);
00201 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00202 break;
00203 case FIPA_ACL:
00204
00205
00206 break;
00207 case RETURN_MSG:
00208
00209 agent = agent_Initialize
00210 (
00211 mc_platform,
00212 message,
00213 mobile_agent_counter
00214 );
00215 if (agent != NULL) {
00216 MUTEX_LOCK(agent->lock);
00217 agent->datastate->persistent = 1;
00218 agent->agent_status = MC_AGENT_NEUTRAL;
00219 MUTEX_UNLOCK(agent->lock);
00220 mobile_agent_counter++;
00221 agent_queue_Add
00222 (
00223 mc_platform->agent_queue,
00224 agent
00225 );
00226 }
00227 message_Destroy(message);
00228
00229 MUTEX_LOCK(mc_platform->MC_signal_lock);
00230 mc_platform->MC_signal = MC_RECV_RETURN;
00231 COND_BROADCAST(mc_platform->MC_signal_cond);
00232 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00233 MUTEX_LOCK(mc_platform->giant_lock);
00234 while(mc_platform->giant == 0) {
00235 COND_WAIT(
00236 mc_platform->giant_cond,
00237 mc_platform->giant_lock
00238 );
00239 }
00240 MUTEX_UNLOCK(mc_platform->giant_lock);
00241
00242 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00243 mc_platform->ams->run = 1;
00244 COND_BROADCAST(mc_platform->ams->runflag_cond);
00245 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00246 break;
00247
00248 #ifdef MC_SECURITY
00249 case ENCRYPTED_DATA:
00250 if (mc_platform->enable_security) {
00251 message_queue_Add
00252 (
00253 mc_platform->asm_message_queue,
00254 message
00255 );
00256 } else {
00257 WARN("mc_security not enabled. Discarding ENCRYPTED_DATA message.");
00258 message_Destroy(message);
00259 }
00260 break;
00261 case ENCRYPTION_INITIALIZE:
00262 if (mc_platform->enable_security) {
00263 asm_queue_Add
00264 (
00265 mc_platform->asm_queue,
00266 asm_node_Initialize(message, mc_platform->security_manager)
00267 );
00268
00269 MUTEX_LOCK(mc_platform->asm_message_queue->lock);
00270 COND_SIGNAL(mc_platform->asm_message_queue->cond);
00271 MUTEX_UNLOCK(mc_platform->asm_message_queue->lock);
00272 message_Destroy(message);
00273 } else {
00274 WARN("mc_security not enabled. Discarding ENCRYPTION_INITIALIZE message.");
00275 message_Destroy(message);
00276 }
00277 break;
00278 case REQUEST_ENCRYPTION_INITIALIZE:
00279 if (mc_platform->enable_security) {
00280 asm_SendEncryptionData(
00281 mc_platform->security_manager,
00282 message->from_address
00283 );
00284 message_Destroy(message);
00285 } else {
00286 WARN("mc_security not enabled. Discarding REQUEST_ENCRYPTION_INITIALIZE message.");
00287 message_Destroy(message);
00288 }
00289 break;
00290 #endif
00291 case RELAY:
00292 case REQUEST:
00293 case SUBSCRIBE:
00294 case CANCEL:
00295 case N_UNDRSTD:
00296 case QUER_IF:
00297 case QUER_REF:
00298 case AGENT_UPDATE:
00299 fprintf(stderr, "FIXME: Message type %d not processable.%s:%d\n",
00300 message->message_type, __FILE__, __LINE__ );
00301 message_Destroy(message);
00302 break;
00303 default:
00304 fprintf(stderr, "Unknown message type:%d %s:%d\n",
00305 message->message_type, __FILE__, __LINE__);
00306 message_Destroy(message);
00307 }
00308 } else {
00309 #ifdef MC_SECURITY
00310 if (mc_platform->enable_security) {
00311 if (
00312 (message->message_type != ENCRYPTED_DATA) &&
00313 (message->message_type != ENCRYPTION_INITIALIZE) &&
00314 (message->message_type != REQUEST_ENCRYPTION_INITIALIZE)
00315 )
00316 {
00317 message_queue_Add
00318 (
00319 mc_platform->asm_message_queue,
00320 message
00321 );
00322 continue;
00323 }
00324 }
00325 #endif
00326 message_Send
00327 (
00328 message
00329 );
00330 message_Destroy
00331 (
00332 message
00333 );
00334 }
00335 }
00336 return 0;
00337 }
00338
00339 #ifndef _WIN32
00340 void*
00341 acc_Thread(void* arg)
00342 #else
00343 DWORD WINAPI
00344 acc_Thread( LPVOID arg )
00345 #endif
00346 {
00347 connection_p connection;
00348 message_p message;
00349 mtp_http_p mtp_http;
00350 mc_platform_p mc_platform = (mc_platform_p)arg;
00351 fipa_acl_envelope_p fipa_envelope;
00352 fipa_acl_message_p fipa_message;
00353 fipa_message_string_p fipa_message_string;
00354 int err;
00355 int i, j;
00356 agent_t* agent;
00357
00358
00359 while(1) {
00360 connection = NULL;
00361 message = NULL;
00362 mtp_http = NULL;
00363 MUTEX_LOCK(mc_platform->connection_queue->lock);
00364 MUTEX_LOCK(mc_platform->quit_lock);
00365 while (mc_platform->connection_queue->size == 0 && !mc_platform->quit) {
00366 MUTEX_UNLOCK(mc_platform->quit_lock);
00367 COND_WAIT(
00368 mc_platform->connection_queue->cond,
00369 mc_platform->connection_queue->lock
00370 );
00371 MUTEX_LOCK(mc_platform->quit_lock);
00372 }
00373 if
00374 (
00375 mc_platform->connection_queue->size == 0 &&
00376 mc_platform->quit
00377 )
00378 {
00379 MUTEX_UNLOCK(mc_platform->quit_lock);
00380 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00381 return 0;
00382 }
00383 MUTEX_UNLOCK(mc_platform->quit_lock);
00384 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00385
00386 MUTEX_LOCK(mc_platform->MC_signal_lock);
00387 mc_platform->MC_signal = MC_RECV_CONNECTION;
00388 COND_BROADCAST(mc_platform->MC_signal_cond);
00389 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00390
00391
00392 MUTEX_LOCK(mc_platform->giant_lock);
00393 while (mc_platform->giant == 0) {
00394 COND_WAIT(
00395 mc_platform->giant_cond,
00396 mc_platform->giant_lock
00397 );
00398 }
00399 MUTEX_UNLOCK(mc_platform->giant_lock);
00400
00401
00402 connection = connection_queue_Pop(mc_platform->connection_queue);
00403 mtp_http = mtp_http_New();
00404 if ( mtp_http_InitializeFromConnection(mtp_http, connection ) )
00405 {
00406 connection_Destroy(connection);
00407 mtp_http_Destroy(mtp_http);
00408 continue;
00409 }
00410
00411 switch(mtp_http->http_performative)
00412 {
00413 case HTTP_POST:
00414 case HTTP_PUT:
00415
00416
00417 if(
00418 !strcmp(mtp_http->target, "/ams") ||
00419 !strcmp( strrchr(mtp_http->target, (int)'/'), "/ams" )
00420 ) {
00421 message = message_New();
00422
00423 message->message_body = (char*)malloc
00424 (
00425 sizeof(char) *
00426 (strlen((char*)mtp_http->content->data)+1)
00427 );
00428 strcpy(message->message_body, (char*)mtp_http->content->data);
00429 message->xml_root = mxmlLoadString
00430 (
00431 NULL,
00432 message->message_body,
00433 MXML_NO_CALLBACK
00434 );
00435 if(message_xml_parse(message)) {
00436 fprintf(stderr, "Error parsing message. %s:%d\n",
00437 __FILE__,__LINE__);
00438 message_Destroy(message);
00439 mtp_http_Destroy(mtp_http);
00440 continue;
00441 }
00442 mtp_http_Destroy(mtp_http);
00443 break;
00444 } else if
00445 (
00446 !strcmp(mtp_http->target, "/acc") ||
00447 !strcmp( strrchr(mtp_http->target, (int)'/'), "/acc")
00448 ) {
00449
00450
00451 if (mtp_http->message_parts != 2) {
00452 fprintf(stderr, "Error parsing message. %s:%d\n",
00453 __FILE__,__LINE__);
00454 mtp_http_Destroy(mtp_http);
00455 continue;
00456 }
00457
00458 fipa_envelope = fipa_acl_envelope_New();
00459 err = fipa_envelope_Parse(fipa_envelope, (char*)mtp_http->content[0].data);
00460 if (err) {
00461 fprintf(stderr, "Error parsing message. %s:%d\n",
00462 __FILE__, __LINE__);
00463 fipa_acl_envelope_Destroy(fipa_envelope);
00464 mtp_http_Destroy(mtp_http);
00465 continue;
00466 }
00467
00468
00469 for(i = 0; i < fipa_envelope->num_params; i++) {
00470 for(j = 0; j < fipa_envelope->params[i]->to->num; j++) {
00471 agent = agent_queue_SearchName(
00472 mc_platform->agent_queue,
00473 fipa_envelope->params[i]->to->fipa_agent_identifiers[j]->name
00474 );
00475 if (agent != NULL) {
00476
00477 fipa_message_string = fipa_message_string_New();
00478 fipa_message_string->message = strdup((char*)mtp_http->content[1].data);
00479 fipa_message_string->parse = fipa_message_string->message;
00480 fipa_message = fipa_acl_message_New();
00481 err = fipa_acl_Parse(fipa_message, fipa_message_string);
00482 if (err) {
00483 fipa_message_string_Destroy(fipa_message_string);
00484 fipa_acl_message_Destroy(fipa_message);
00485 fipa_acl_envelope_Destroy(fipa_envelope);
00486 mtp_http_Destroy(mtp_http);
00487 continue;
00488 }
00489 agent_mailbox_Post( agent->mailbox, fipa_message);
00490 fipa_message_string_Destroy(fipa_message_string);
00491 }
00492 }
00493 }
00494 fipa_acl_envelope_Destroy(fipa_envelope);
00495 mtp_http_Destroy(mtp_http);
00496 continue;
00497 }
00498 else {
00499
00500 fprintf(stderr, "Unsupported. %s:%d\n", __FILE__, __LINE__);
00501 mtp_http_Destroy(mtp_http);
00502 }
00503 default:
00504 fprintf(stderr, "unsupported http performative. %s:%d\n",
00505 __FILE__, __LINE__);
00506 }
00507
00508
00509 connection_Destroy(connection);
00510 switch(message->message_type) {
00511 case RELAY:
00512 case REQUEST:
00513 case SUBSCRIBE:
00514 case CANCEL:
00515 case N_UNDRSTD:
00516 case MOBILE_AGENT:
00517 case QUER_IF:
00518 case QUER_REF:
00519 case AGENT_UPDATE:
00520 case RETURN_MSG:
00521 case FIPA_ACL:
00522 message_queue_Add(mc_platform->message_queue, message);
00523 break;
00524 #ifdef MC_SECURITY
00525 case ENCRYPTED_DATA:
00526 case ENCRYPTION_INITIALIZE:
00527 if (mc_platform->enable_security) {
00528 message_queue_Add(mc_platform->asm_message_queue, message);
00529 } else {
00530 WARN("MC Security not enabled. Discarding message...");
00531 message_Destroy(message);
00532 }
00533 break;
00534 case REQUEST_ENCRYPTION_INITIALIZE:
00535 if (mc_platform->enable_security) {
00536 message_queue_Add(mc_platform->message_queue, message);
00537 } else {
00538 WARN("MC Security not enabled. Discarding message...");
00539 message_Destroy(message);
00540 }
00541 break;
00542 #endif
00543 default:
00544 fprintf(stderr, "Unknown message type:%d. Rejecting message.%s:%d\n",
00545 message->message_type,
00546 __FILE__, __LINE__ );
00547 free(message);
00548 break;
00549 }
00550 }
00551 }
00552
00553 void
00554 acc_Start(mc_platform_p mc_platform)
00555 {
00556 acc_p acc = mc_platform->acc;
00557 #ifndef _WIN32
00558 pthread_attr_t attr;
00559 pthread_attr_init(&attr);
00560 if(mc_platform->stack_size[MC_THREAD_ACC] != -1) {
00561 pthread_attr_setstacksize
00562 (
00563 &attr,
00564 mc_platform->stack_size[MC_THREAD_ACC]
00565 );
00566 }
00567 #else
00568 int stack_size;
00569 if (mc_platform->stack_size[MC_THREAD_ACC] < 1) {
00570
00571 stack_size = mc_platform->stack_size[MC_THREAD_ACC]+1;
00572 } else {
00573 stack_size = mc_platform->stack_size[MC_THREAD_ACC];
00574 }
00575 #endif
00576 THREAD_CREATE
00577 (
00578 &acc->thread,
00579 acc_Thread,
00580 mc_platform
00581 );
00582 THREAD_CREATE
00583 (
00584 &acc->message_handler_thread,
00585 acc_MessageHandlerThread,
00586 mc_platform
00587 );
00588 THREAD_CREATE
00589 (
00590 &acc->listen_thread,
00591 listen_Thread,
00592 mc_platform
00593 );
00594 }
00595
00596 #ifndef _WIN32
00597 void*
00598 listen_Thread(void* arg)
00599 #else
00600 DWORD WINAPI
00601 listen_Thread( LPVOID arg )
00602 #endif
00603 {
00604 #ifndef _WIN32
00605 int connectionsockfd;
00606 int sockfd;
00607 struct sockaddr_in sktin;
00608 struct sockaddr_in peer_addr;
00609 #else
00610 SOCKET connectionsockfd;
00611 SOCKET sockfd;
00612 struct sockaddr_in sktin;
00613 struct sockaddr_in peer_addr;
00614 #endif
00615
00616 connection_p connection;
00617 u_long connection_number;
00618 int connectionlen;
00619 mc_platform_p mc_platform = (mc_platform_p)arg;
00620
00621
00622 connection_number = 0;
00623
00624 connectionlen = sizeof(struct sockaddr_in);
00625
00626
00627 sockfd = socket(PF_INET, SOCK_STREAM, 0);
00628 sktin.sin_family = AF_INET;
00629 sktin.sin_port = htons(mc_platform->port);
00630 sktin.sin_addr.s_addr = INADDR_ANY;
00631 memset(sktin.sin_zero, '\0', sizeof sktin.sin_zero);
00632 if (bind(sockfd, (struct sockaddr *)&sktin, sizeof(struct sockaddr))
00633 == -1) {
00634 fprintf(stderr, "bind() error. %s:%d\n",
00635 __FILE__, __LINE__ );
00636 exit(1);
00637 }
00638 listen(sockfd, BACKLOG);
00639
00640
00641 while(1)
00642 {
00643
00644 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00645 mc_platform->acc->waiting = 1;
00646 COND_BROADCAST(mc_platform->acc->waiting_cond);
00647 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00648 #ifndef _WIN32
00649 if((connectionsockfd = accept(sockfd,
00650 (struct sockaddr *)&peer_addr,
00651 (socklen_t *)&connectionlen)) < 0)
00652 #else
00653 if((connectionsockfd = accept(sockfd,
00654 (struct sockaddr *)&peer_addr,
00655 (int*)&connectionlen)) == INVALID_SOCKET)
00656 #endif
00657 {
00658 fprintf(stderr, "ListenThread: accept error \n");
00659 #ifdef _WIN32
00660 printf("Error number: %d\n", WSAGetLastError() );
00661 continue;
00662 #endif
00663 continue;
00664 }
00665 else
00666 {
00667
00668 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00669 mc_platform->acc->waiting = 0;
00670 COND_BROADCAST(mc_platform->acc->waiting_cond);
00671 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00672
00673
00674 connection = (connection_p)malloc(sizeof(connection_t));
00675 CHECK_NULL(connection, exit(0););
00676 connection->connect_id = rand();
00677 connection->remote_hostname = NULL;
00678 connection->addr = peer_addr;
00679 connection->serverfd = sockfd;
00680 connection->clientfd = connectionsockfd;
00681
00682
00683 connection_queue_Add(mc_platform->connection_queue, connection);
00684 }
00685 }
00686
00687
00688 #ifndef _WIN32
00689 pthread_exit(0);
00690 #else
00691 ExitThread(0);
00692 #endif
00693
00694 return 0;
00695 }