00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef _WIN32
00036 #include <sys/socket.h>
00037 #include <arpa/inet.h>
00038 #include <netinet/in.h>
00039 #include <netdb.h>
00040 #include <sys/un.h>
00041 #include <unistd.h>
00042 #include <sys/time.h>
00043 #include <netdb.h>
00044 #include <pthread.h>
00045 #else
00046 #include <winsock.h>
00047 #include <windows.h>
00048 #include <time.h>
00049 #endif
00050
00051 #include <stdlib.h>
00052 #include "include/acc.h"
00053 #include "include/connection.h"
00054 #include "include/data_structures.h"
00055 #include "include/macros.h"
00056 #include "include/mc_error.h"
00057 #include "include/mc_platform.h"
00058 #include "include/message.h"
00059 #include "include/mtp_http.h"
00060 #include "include/xml_parser.h"
00061 #include "include/fipa_acl_envelope.h"
00062
00063 #define BACKLOG 10
00064
00065 acc_p
00066 acc_Initialize(struct mc_platform_s* mc_platform)
00067 {
00068 acc_p acc;
00069 acc = (acc_p)malloc(sizeof(acc_t));
00070 acc->mc_platform = mc_platform;
00071
00072 acc->waiting = 0;
00073 acc->waiting_lock = (MUTEX_T*)malloc(sizeof(MUTEX_T));
00074 MUTEX_INIT(acc->waiting_lock);
00075 acc->waiting_cond = (COND_T*)malloc(sizeof(COND_T));
00076 COND_INIT(acc->waiting_cond);
00077
00078 return acc;
00079 }
00080
00081 int
00082 acc_Destroy(acc_p acc)
00083 {
00084 if(acc == NULL) {
00085 return MC_SUCCESS;
00086 }
00087 free(acc);
00088 acc = NULL;
00089 return MC_SUCCESS;
00090 }
00091
00092 #ifndef _WIN32
00093 void*
00094 acc_MessageHandlerThread(void* arg)
00095 #else
00096 DWORD WINAPI
00097 acc_MessageHandlerThread(LPVOID arg)
00098 #endif
00099 {
00100 mc_platform_p mc_platform = (mc_platform_p)arg;
00101 message_p message;
00102 agent_p agent;
00103 int mobile_agent_counter = 1;
00104 char* tmpstr;
00105 char* origname;
00106 int i;
00107
00108 while(1)
00109 {
00110 MUTEX_LOCK(mc_platform->message_queue->lock);
00111 MUTEX_LOCK(mc_platform->quit_lock);
00112 while(mc_platform->message_queue->size == 0 && !mc_platform->quit) {
00113 MUTEX_UNLOCK(mc_platform->quit_lock);
00114 COND_WAIT(
00115 mc_platform->message_queue->cond,
00116 mc_platform->message_queue->lock );
00117 MUTEX_LOCK(mc_platform->quit_lock);
00118 }
00119 if (mc_platform->message_queue->size == 0 && mc_platform->quit)
00120 {
00121 MUTEX_UNLOCK(mc_platform->quit_lock);
00122 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00123 return 0;
00124 }
00125
00126 MUTEX_UNLOCK(mc_platform->quit_lock);
00127 MUTEX_UNLOCK(mc_platform->message_queue->lock);
00128 message = message_queue_Pop(mc_platform->message_queue);
00129 if (message == NULL) {
00130 printf("POP ERROR\n");
00131 continue;
00132 }
00133
00134 MUTEX_LOCK(mc_platform->MC_signal_lock);
00135 mc_platform->MC_signal = MC_RECV_MESSAGE;
00136 COND_BROADCAST(mc_platform->MC_signal_cond);
00137 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00138 MUTEX_LOCK(mc_platform->giant_lock);
00139 while(mc_platform->giant == 0) {
00140 COND_WAIT (
00141 mc_platform->giant_cond,
00142 mc_platform->giant_lock);
00143 }
00144 MUTEX_UNLOCK(mc_platform->giant_lock);
00145
00146
00147 if(message->to_address == NULL) {
00148
00149
00150 switch(message->message_type) {
00151 case MOBILE_AGENT:
00152 agent = agent_Initialize(
00153 mc_platform,
00154 message,
00155 mobile_agent_counter);
00156 if (agent != NULL) {
00157
00158 i = 1;
00159 if(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00160 origname = agent->name;
00161 while(agent_queue_SearchName(mc_platform->agent_queue, agent->name)) {
00162
00163
00164 tmpstr = (char*)malloc(sizeof(char) * strlen(origname) + 7);
00165 sprintf(tmpstr, "%s_%04d", origname, i);
00166 agent->name = tmpstr;
00167 i++;
00168 }
00169 fprintf(stderr, "Warning: Agent '%s' has been renamed to '%s'.\n",
00170 origname, agent->name);
00171 free(origname);
00172 }
00173 mobile_agent_counter++;
00174 agent_queue_Add(
00175 mc_platform->agent_queue,
00176 agent);
00177 }
00178 message_Destroy(message);
00179
00180 MUTEX_LOCK(mc_platform->MC_signal_lock);
00181 mc_platform->MC_signal = MC_RECV_AGENT;
00182 COND_BROADCAST(mc_platform->MC_signal_cond);
00183 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00184 MUTEX_LOCK(mc_platform->giant_lock);
00185 while(mc_platform->giant == 0) {
00186 COND_WAIT(mc_platform->giant_cond,
00187 mc_platform->giant_lock);
00188 }
00189 MUTEX_UNLOCK(mc_platform->giant_lock);
00190
00191 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00192 mc_platform->ams->run = 1;
00193 COND_BROADCAST(mc_platform->ams->runflag_cond);
00194 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00195 break;
00196 case FIPA_ACL:
00197
00198
00199 break;
00200 case RETURN_MSG:
00201
00202 agent = agent_Initialize(
00203 mc_platform,
00204 message,
00205 mobile_agent_counter);
00206 if (agent != NULL) {
00207 MUTEX_LOCK(agent->lock);
00208 agent->datastate->persistent = 1;
00209 agent->agent_status = MC_AGENT_NEUTRAL;
00210 MUTEX_UNLOCK(agent->lock);
00211 mobile_agent_counter++;
00212 agent_queue_Add(
00213 mc_platform->agent_queue,
00214 agent);
00215 }
00216 message_Destroy(message);
00217
00218 MUTEX_LOCK(mc_platform->MC_signal_lock);
00219 mc_platform->MC_signal = MC_RECV_RETURN;
00220 COND_BROADCAST(mc_platform->MC_signal_cond);
00221 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00222 MUTEX_LOCK(mc_platform->giant_lock);
00223 while(mc_platform->giant == 0) {
00224 COND_WAIT(
00225 mc_platform->giant_cond,
00226 mc_platform->giant_lock);
00227 }
00228 MUTEX_UNLOCK(mc_platform->giant_lock);
00229
00230 MUTEX_LOCK(mc_platform->ams->runflag_lock);
00231 mc_platform->ams->run = 1;
00232 COND_BROADCAST(mc_platform->ams->runflag_cond);
00233 MUTEX_UNLOCK(mc_platform->ams->runflag_lock);
00234 break;
00235
00236 #ifdef MC_SECURITY
00237 case ENCRYPTED_DATA:
00238 if (mc_platform->enable_security) {
00239 message_queue_Add
00240 (
00241 mc_platform->asm_message_queue,
00242 message
00243 );
00244 } else {
00245 WARN("mc_security not enabled. Discarding ENCRYPTED_DATA message.");
00246 message_Destroy(message);
00247 }
00248 break;
00249 case ENCRYPTION_INITIALIZE:
00250 if (mc_platform->enable_security) {
00251 asm_queue_Add
00252 (
00253 mc_platform->asm_queue,
00254 asm_node_Initialize(message, mc_platform->security_manager)
00255 );
00256
00257 MUTEX_LOCK(mc_platform->asm_message_queue->lock);
00258 COND_SIGNAL(mc_platform->asm_message_queue->cond);
00259 MUTEX_UNLOCK(mc_platform->asm_message_queue->lock);
00260 message_Destroy(message);
00261 } else {
00262 WARN("mc_security not enabled. Discarding ENCRYPTION_INITIALIZE message.");
00263 message_Destroy(message);
00264 }
00265 break;
00266 case REQUEST_ENCRYPTION_INITIALIZE:
00267 if (mc_platform->enable_security) {
00268 asm_SendEncryptionData(
00269 mc_platform->security_manager,
00270 message->from_address
00271 );
00272 message_Destroy(message);
00273 } else {
00274 WARN("mc_security not enabled. Discarding REQUEST_ENCRYPTION_INITIALIZE message.");
00275 message_Destroy(message);
00276 }
00277 break;
00278 #endif
00279 case RELAY:
00280 case REQUEST:
00281 case SUBSCRIBE:
00282 case CANCEL:
00283 case N_UNDRSTD:
00284 case QUER_IF:
00285 case QUER_REF:
00286 case AGENT_UPDATE:
00287 fprintf(stderr, "FIXME: Message type %d not processable.%s:%d\n",
00288 message->message_type, __FILE__, __LINE__ );
00289 message_Destroy(message);
00290 break;
00291 default:
00292 fprintf(stderr, "Unknown message type:%d %s:%d\n",
00293 message->message_type, __FILE__, __LINE__);
00294 message_Destroy(message);
00295 }
00296 } else {
00297 #ifdef MC_SECURITY
00298 if (mc_platform->enable_security) {
00299 if (
00300 (message->message_type != ENCRYPTED_DATA) &&
00301 (message->message_type != ENCRYPTION_INITIALIZE) &&
00302 (message->message_type != REQUEST_ENCRYPTION_INITIALIZE)
00303 )
00304 {
00305 message_queue_Add
00306 (
00307 mc_platform->asm_message_queue,
00308 message
00309 );
00310 continue;
00311 }
00312 }
00313 #endif
00314 message_Send
00315 (
00316 message
00317 );
00318 message_Destroy
00319 (
00320 message
00321 );
00322 }
00323 }
00324 return 0;
00325 }
00326
00327 #ifndef _WIN32
00328 void*
00329 acc_Thread(void* arg)
00330 #else
00331 DWORD WINAPI
00332 acc_Thread( LPVOID arg )
00333 #endif
00334 {
00335 connection_p connection;
00336 message_p message;
00337 mtp_http_p mtp_http;
00338 mc_platform_p mc_platform = (mc_platform_p)arg;
00339 fipa_acl_envelope_p fipa_envelope;
00340 fipa_acl_message_p fipa_message;
00341 fipa_message_string_p fipa_message_string;
00342 int err;
00343 int i, j;
00344 agent_t* agent;
00345
00346
00347 while(1) {
00348 connection = NULL;
00349 message = NULL;
00350 mtp_http = NULL;
00351 MUTEX_LOCK(mc_platform->connection_queue->lock);
00352 MUTEX_LOCK(mc_platform->quit_lock);
00353 while (mc_platform->connection_queue->size == 0 && !mc_platform->quit) {
00354 MUTEX_UNLOCK(mc_platform->quit_lock);
00355 COND_WAIT(
00356 mc_platform->connection_queue->cond,
00357 mc_platform->connection_queue->lock
00358 );
00359 MUTEX_LOCK(mc_platform->quit_lock);
00360 }
00361 if
00362 (
00363 mc_platform->connection_queue->size == 0 &&
00364 mc_platform->quit
00365 )
00366 {
00367 MUTEX_UNLOCK(mc_platform->quit_lock);
00368 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00369 return 0;
00370 }
00371 MUTEX_UNLOCK(mc_platform->quit_lock);
00372 MUTEX_UNLOCK(mc_platform->connection_queue->lock);
00373
00374 MUTEX_LOCK(mc_platform->MC_signal_lock);
00375 mc_platform->MC_signal = MC_RECV_CONNECTION;
00376 COND_BROADCAST(mc_platform->MC_signal_cond);
00377 MUTEX_UNLOCK(mc_platform->MC_signal_lock);
00378
00379
00380 MUTEX_LOCK(mc_platform->giant_lock);
00381 while (mc_platform->giant == 0) {
00382 COND_WAIT(
00383 mc_platform->giant_cond,
00384 mc_platform->giant_lock
00385 );
00386 }
00387 MUTEX_UNLOCK(mc_platform->giant_lock);
00388
00389
00390 connection = connection_queue_Pop(mc_platform->connection_queue);
00391 mtp_http = mtp_http_New();
00392 if ( mtp_http_InitializeFromConnection(mtp_http, connection ) )
00393 {
00394 connection_Destroy(connection);
00395 mtp_http_Destroy(mtp_http);
00396 continue;
00397 }
00398
00399 switch(mtp_http->http_performative)
00400 {
00401 case HTTP_POST:
00402 case HTTP_PUT:
00403
00404
00405 if(
00406 !strcmp(mtp_http->target, "/ams") ||
00407 !strcmp( strrchr(mtp_http->target, (int)'/'), "/ams" )
00408 ) {
00409 message = message_New();
00410
00411 message->message_body = (char*)malloc
00412 (
00413 sizeof(char) *
00414 (strlen((char*)mtp_http->content->data)+1)
00415 );
00416 strcpy(message->message_body, (char*)mtp_http->content->data);
00417 message->xml_root = mxmlLoadString
00418 (
00419 NULL,
00420 message->message_body,
00421 MXML_NO_CALLBACK
00422 );
00423 if(message_xml_parse(message)) {
00424 fprintf(stderr, "Error parsing message. %s:%d\n",
00425 __FILE__,__LINE__);
00426 message_Destroy(message);
00427 mtp_http_Destroy(mtp_http);
00428 continue;
00429 }
00430 mtp_http_Destroy(mtp_http);
00431 break;
00432 } else if
00433 (
00434 !strcmp(mtp_http->target, "/acc") ||
00435 !strcmp( strrchr(mtp_http->target, (int)'/'), "/acc")
00436 ) {
00437
00438
00439 if (mtp_http->message_parts != 2) {
00440 fprintf(stderr, "Error parsing message. %s:%d\n",
00441 __FILE__,__LINE__);
00442 mtp_http_Destroy(mtp_http);
00443 continue;
00444 }
00445
00446 fipa_envelope = fipa_acl_envelope_New();
00447 err = fipa_envelope_Parse(fipa_envelope, (char*)mtp_http->content[0].data);
00448 if (err) {
00449 fprintf(stderr, "Error parsing message. %s:%d\n",
00450 __FILE__, __LINE__);
00451 fipa_acl_envelope_Destroy(fipa_envelope);
00452 mtp_http_Destroy(mtp_http);
00453 continue;
00454 }
00455
00456
00457 for(i = 0; i < fipa_envelope->num_params; i++) {
00458 for(j = 0; j < fipa_envelope->params[i]->to->num; j++) {
00459 agent = agent_queue_SearchName(
00460 mc_platform->agent_queue,
00461 fipa_envelope->params[i]->to->fipa_agent_identifiers[j]->name
00462 );
00463 if (agent != NULL) {
00464
00465 fipa_message_string = fipa_message_string_New();
00466 fipa_message_string->message = strdup((char*)mtp_http->content[1].data);
00467 fipa_message_string->parse = fipa_message_string->message;
00468 fipa_message = fipa_acl_message_New();
00469 err = fipa_acl_Parse(fipa_message, fipa_message_string);
00470 if (err) {
00471 fipa_message_string_Destroy(fipa_message_string);
00472 fipa_acl_message_Destroy(fipa_message);
00473 fipa_acl_envelope_Destroy(fipa_envelope);
00474 mtp_http_Destroy(mtp_http);
00475 continue;
00476 }
00477 agent_mailbox_Post( agent->mailbox, fipa_message);
00478 fipa_message_string_Destroy(fipa_message_string);
00479 }
00480 }
00481 }
00482 fipa_acl_envelope_Destroy(fipa_envelope);
00483 mtp_http_Destroy(mtp_http);
00484 continue;
00485 }
00486 else {
00487
00488 fprintf(stderr, "Unsupported. %s:%d\n", __FILE__, __LINE__);
00489 mtp_http_Destroy(mtp_http);
00490 }
00491 default:
00492 fprintf(stderr, "unsupported http performative. %s:%d\n",
00493 __FILE__, __LINE__);
00494 }
00495
00496
00497 connection_Destroy(connection);
00498 switch(message->message_type) {
00499 case RELAY:
00500 case REQUEST:
00501 case SUBSCRIBE:
00502 case CANCEL:
00503 case N_UNDRSTD:
00504 case MOBILE_AGENT:
00505 case QUER_IF:
00506 case QUER_REF:
00507 case AGENT_UPDATE:
00508 case RETURN_MSG:
00509 case FIPA_ACL:
00510 message_queue_Add(mc_platform->message_queue, message);
00511 break;
00512 #ifdef MC_SECURITY
00513 case ENCRYPTED_DATA:
00514 case ENCRYPTION_INITIALIZE:
00515 if (mc_platform->enable_security) {
00516 message_queue_Add(mc_platform->asm_message_queue, message);
00517 } else {
00518 WARN("MC Security not enabled. Discarding message...");
00519 message_Destroy(message);
00520 }
00521 break;
00522 case REQUEST_ENCRYPTION_INITIALIZE:
00523 if (mc_platform->enable_security) {
00524 message_queue_Add(mc_platform->message_queue, message);
00525 } else {
00526 WARN("MC Security not enabled. Discarding message...");
00527 message_Destroy(message);
00528 }
00529 break;
00530 #endif
00531 default:
00532 fprintf(stderr, "Unknown message type:%d. Rejecting message.%s:%d\n",
00533 message->message_type,
00534 __FILE__, __LINE__ );
00535 free(message);
00536 break;
00537 }
00538 }
00539 }
00540
00541 void
00542 acc_Start(mc_platform_p mc_platform)
00543 {
00544 acc_p acc = mc_platform->acc;
00545 #ifndef _WIN32
00546 pthread_attr_t attr;
00547 pthread_attr_init(&attr);
00548 if(mc_platform->stack_size[MC_THREAD_ACC] != -1) {
00549 pthread_attr_setstacksize
00550 (
00551 &attr,
00552 mc_platform->stack_size[MC_THREAD_ACC]
00553 );
00554 }
00555 #else
00556 int stack_size;
00557 if (mc_platform->stack_size[MC_THREAD_ACC] < 1) {
00558
00559 stack_size = mc_platform->stack_size[MC_THREAD_ACC]+1;
00560 } else {
00561 stack_size = mc_platform->stack_size[MC_THREAD_ACC];
00562 }
00563 #endif
00564 THREAD_CREATE
00565 (
00566 &acc->thread,
00567 acc_Thread,
00568 mc_platform
00569 );
00570 THREAD_CREATE
00571 (
00572 &acc->message_handler_thread,
00573 acc_MessageHandlerThread,
00574 mc_platform
00575 );
00576 THREAD_CREATE
00577 (
00578 &acc->listen_thread,
00579 listen_Thread,
00580 mc_platform
00581 );
00582 }
00583
00584 #ifndef _WIN32
00585 void*
00586 listen_Thread(void* arg)
00587 #else
00588 DWORD WINAPI
00589 listen_Thread( LPVOID arg )
00590 #endif
00591 {
00592 #ifndef _WIN32
00593 int connectionsockfd;
00594 int sockfd;
00595 struct sockaddr_in sktin;
00596 struct sockaddr_in peer_addr;
00597 #else
00598 SOCKET connectionsockfd;
00599 SOCKET sockfd;
00600 struct sockaddr_in sktin;
00601 struct sockaddr_in peer_addr;
00602 #endif
00603
00604 connection_p connection;
00605 u_long connection_number;
00606 int connectionlen;
00607 mc_platform_p mc_platform = (mc_platform_p)arg;
00608
00609
00610 connection_number = 0;
00611
00612 connectionlen = sizeof(struct sockaddr_in);
00613
00614
00615 sockfd = socket(PF_INET, SOCK_STREAM, 0);
00616 sktin.sin_family = AF_INET;
00617 sktin.sin_port = htons(mc_platform->port);
00618 sktin.sin_addr.s_addr = INADDR_ANY;
00619 memset(sktin.sin_zero, '\0', sizeof sktin.sin_zero);
00620 if (bind(sockfd, (struct sockaddr *)&sktin, sizeof(struct sockaddr))
00621 == -1) {
00622 fprintf(stderr, "bind() error. %s:%d\n",
00623 __FILE__, __LINE__ );
00624 exit(1);
00625 }
00626 listen(sockfd, BACKLOG);
00627
00628
00629 while(1)
00630 {
00631
00632 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00633 mc_platform->acc->waiting = 1;
00634 COND_BROADCAST(mc_platform->acc->waiting_cond);
00635 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00636 #ifndef _WIN32
00637 if((connectionsockfd = accept(sockfd,
00638 (struct sockaddr *)&peer_addr,
00639 (socklen_t *)&connectionlen)) < 0)
00640 #else
00641 if((connectionsockfd = accept(sockfd,
00642 (struct sockaddr *)&peer_addr,
00643 (int*)&connectionlen)) == INVALID_SOCKET)
00644 #endif
00645 {
00646 fprintf(stderr, "ListenThread: accept error \n");
00647 #ifdef _WIN32
00648 printf("Error number: %d\n", WSAGetLastError() );
00649 continue;
00650 #endif
00651 continue;
00652 }
00653 else
00654 {
00655
00656 MUTEX_LOCK(mc_platform->acc->waiting_lock);
00657 mc_platform->acc->waiting = 0;
00658 COND_BROADCAST(mc_platform->acc->waiting_cond);
00659 MUTEX_UNLOCK(mc_platform->acc->waiting_lock);
00660
00661
00662 connection = (connection_p)malloc(sizeof(connection_t));
00663 CHECK_NULL(connection, exit(0););
00664 connection->connect_id = rand();
00665 connection->remote_hostname = NULL;
00666 connection->addr = peer_addr;
00667 connection->serverfd = sockfd;
00668 connection->clientfd = connectionsockfd;
00669
00670
00671 connection_queue_Add(mc_platform->connection_queue, connection);
00672 }
00673 }
00674
00675
00676 #ifndef _WIN32
00677 pthread_exit(0);
00678 #else
00679 ExitThread(0);
00680 #endif
00681
00682 return 0;
00683 }