00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "Messenger.h"
00022
00023 namespace cmlabs {
00024
00025 Messenger::Messenger(const JString& name, MessageSender* sender, const JString& wakeupMessageType, const JString& phaseChangeType) {
00026 perfStat = NULL;
00027 signalHandler = NULL;
00028 messageSender = sender;
00029 this->wakeupMessageType = wakeupMessageType;
00030 this->phaseChangeType = phaseChangeType;
00031 parameters.noDelete();
00032 streams.noDelete();
00033 totalTriggerCount = 0;
00034 shouldContinue = true;
00035 this->name = name;
00036 currentMessage = NULL;
00037 currentPhaseSpec = NULL;
00038 startProcessTime.setInvalid();
00039 verbose = 0;
00040 priority = 0;
00041 inQueue.objectTable.sorting |= SORTREVERSE;
00042
00043 inQueue.noDelete();
00044 recentMessages = NULL;
00045 }
00046
00047 Messenger::~Messenger() {
00048 if (currentMessage != NULL)
00049 delete(currentMessage);
00050 if (phaseMutex.enterMutex(1000)) {
00051 if (currentPhaseSpec != NULL)
00052 delete(currentPhaseSpec);
00053 currentPhaseSpec = NULL;
00054 phaseMutex.leaveMutex();
00055 }
00056 }
00057
00058
00059
00060 JString Messenger::getName() {
00061 return name;
00062 }
00063
00064
00065 JString Messenger::getTriggerAlias() {
00066 JString alias;
00067 if (phaseMutex.enterMutex(1000)) {
00068 PhaseSpec* spec = getPhaseSpec();
00069 if (spec != NULL)
00070 alias = spec->lastTriggerName;
00071 phaseMutex.leaveMutex();
00072 }
00073 return alias;
00074 }
00075
00076 Collection Messenger::getDestinations() {
00077 Collection destinations;
00078
00079 if (messageSender == NULL)
00080 return destinations;
00081
00082 if (!phaseMutex.enterMutex(500))
00083 return destinations;
00084 PhaseSpec* spec = getPhaseSpec();
00085 if (spec == NULL)
00086 return destinations;
00087
00088 PostSpec* post;
00089
00090 for (int n=0; n<spec->posts->getCount(); n++) {
00091 if ( (post = (PostSpec*) spec->posts->get(n)) != NULL) {
00092 destinations.add(post->to);
00093 }
00094 }
00095 phaseMutex.leaveMutex();
00096 return destinations;
00097 }
00098
00099 bool Messenger::isWakeupMessage(Message* msg) {
00100 if ((msg == NULL) || (msg->getObject() == NULL))
00101 return false;
00102 return msg->type.equalsIgnoreCase(wakeupMessageType);
00103 }
00104
00105 int Messenger::getInputMessageCount() {
00106 return inQueue.getCount();
00107 }
00108
00109 int Messenger::getOutputMessageCount() {
00110 return outputMessages.getCount();
00111 }
00112
00113 int Messenger::getTriggerCount() {
00114 return totalTriggerCount;
00115 }
00116
00117 bool Messenger::shouldContinueRunning() {
00118 if (!mutex.EnterMutex(1000))
00119 return true;
00120 bool res = shouldContinue;
00121 mutex.LeaveMutex();
00122 return res;
00123 }
00124
00125
00126 bool Messenger::processFirstMessage() {
00127 if (!shouldContinue)
00128 return false;
00129
00130 Message* msg = NULL;
00131 if (inQueueMutex.EnterMutex(100)) {
00132 msg = (Message*) inQueue.getFirst();
00133 inQueueMutex.LeaveMutex();
00134 }
00135
00136 if (msg == NULL)
00137 return false;
00138
00139 PhaseSpec* newSpec = NULL;
00140
00141 ObjectDictionary* dict = (ObjectDictionary*) msg->getObject();
00142 if (dict != NULL)
00143 newSpec = (PhaseSpec*) dict->get("PhaseSpec");
00144
00145 if (newSpec == NULL)
00146 return false;
00147
00148 checkNewPhaseSpec((PhaseSpec*)newSpec->clone(), msg->getTimestamp());
00149 return true;
00150 }
00151
00152 Message* Messenger::waitForNewMessage(int timeout) {
00153 if (!shouldContinue)
00154 return NULL;
00155
00156
00157 Message* msg = getNextMessage(timeout);
00158
00159 if (currentMessage != NULL) {
00160 if ((perfStat != NULL) && (startProcessTime.isValid())) {
00161 perfStat->msgProcessed++;
00162 JTime endProcessTime;
00163 perfStat->deltaRunSeconds = ((double)endProcessTime.microDifference(startProcessTime)) / 1000000.0;
00164 perfStat->totalRunSeconds += perfStat->deltaRunSeconds;
00165 perfStat->runCount++;
00166 }
00167 }
00168
00169 if ( (perfStat != NULL) && (perfStat->lastUpdated.getAge() > 1000)) {
00170 JTime now;
00171 ThreadStat cur = aThread.getCallingThreadStatistics();
00172 perfStat->currentCPUUsage = cur.percentCPU / 100.0;
00173 perfStat->deltaCPUSeconds = ((double)cur.cpuUsage) / 1000000.0;
00174 perfStat->totalCPUSeconds += perfStat->deltaCPUSeconds;
00175 perfStat->deltaSeconds = now.microDifference(perfStat->lastUpdated) / 1000000.0;
00176 perfStat->lastUpdated = now;
00177
00178 if (perfStat->deltaSeconds > 0)
00179 perfStat->deltaCPUSeconds = perfStat->deltaCPUSeconds / perfStat->deltaSeconds;
00180 aThread.resetCallingThreadStatistics();
00181 }
00182
00183 if (msg == NULL)
00184 return NULL;
00185
00186 PhaseSpec* newSpec;
00187 if (msg->getType().equalsIgnoreCase(wakeupMessageType)) {
00188
00189
00190
00191
00192
00193 ObjectDictionary* dict = (ObjectDictionary*) msg->getObject();
00194 if (dict == NULL) {
00195 newSpec = NULL;
00196 }
00197 else {
00198 newSpec = (PhaseSpec*) dict->get("PhaseSpec");
00199 dict->removeNoDelete("PhaseSpec");
00200 }
00201
00202 if (!checkNewPhaseSpec(newSpec, msg->getTimestamp())) {
00203
00204 delete(msg);
00205 return NULL;
00206 }
00207
00208 if (currentMessage != NULL) {
00209 delete(currentMessage);
00210 currentMessage = NULL;
00211 }
00212 currentMessage = msg;
00213 msg = getTriggerMessage();
00214
00215 if ( (msg != NULL) && (recentMessages != NULL) && (!recentMessages->containsKey(msg)) ) {
00216 recentMessages->add(msg->clone(), new ObjectDictionary());
00217 if (recentMessages->getCount() > 10)
00218 recentMessages->removeFirst();
00219 if (dict != NULL) {
00220 ObjectCollection* retrieves = new ObjectCollection();
00221 dict->put("Retrieves", retrieves);
00222 ObjectCollection* rets = this->getAllRetrievedMessages();
00223 if (rets != NULL) {
00224 for (Message* mm = (Message*) rets->getFirst(); mm!=NULL; mm = (Message*) rets->getNext())
00225 retrieves->add(new Reference(mm));
00226 }
00227 }
00228 }
00229
00230 }
00231 else {
00232
00233
00234
00235
00236
00237
00238 if (currentMessage != NULL) {
00239 delete(currentMessage);
00240 currentMessage = NULL;
00241 }
00242 }
00243
00244 if ((perfStat != NULL) && (!startProcessTime.isValid())) {
00245 aThread.resetCallingThreadStatistics();
00246 }
00247 startProcessTime = JTime();
00248 return msg;
00249 }
00250
00251 bool Messenger::processPhaseSpec(PhaseSpec* newSpec, JTime t) {
00252 return checkNewPhaseSpec(newSpec, t);
00253 }
00254
00255
00256 bool Messenger::checkNewPhaseSpec(PhaseSpec* newSpec, JTime msgtime) {
00257
00258 if (!phaseMutex.enterMutex(500))
00259 return false;
00260
00261 if ( (newSpec != NULL) && (currentPhaseSpec != NULL) ) {
00262 if (newSpec->equals(currentPhaseSpec)) {
00263
00264 delete(currentPhaseSpec);
00265 currentPhaseSpec = newSpec;
00266
00267 phaseMutex.leaveMutex();
00268 return true;
00269 }
00270 else {
00271
00272 if (newSpec->isOlderThan(currentPhaseSpec)) {
00273
00274
00275
00276 phaseMutex.leaveMutex();
00277 return false;
00278 }
00279 else {
00280
00281 delete(currentPhaseSpec);
00282 currentPhaseSpec = newSpec;
00283 clearOldMessagesFromQueue(currentPhaseSpec->activationTime);
00284 phaseMutex.leaveMutex();
00285 return true;
00286 }
00287 }
00288 }
00289 else if (newSpec != NULL) {
00290
00291 currentPhaseSpec = newSpec;
00292 clearOldMessagesFromQueue(currentPhaseSpec->activationTime);
00293 phaseMutex.leaveMutex();
00294 return true;
00295 }
00296 else {
00297
00298 delete(currentPhaseSpec);
00299 currentPhaseSpec = NULL;
00300 clearOldMessagesFromQueue(msgtime);
00301 phaseMutex.leaveMutex();
00302 return true;
00303 }
00304 }
00305
00306
00307 bool Messenger::log(int level, char *format, ...) {
00308
00309 va_list args;
00310 va_start(args, format);
00311
00312 char* str = new char[10000];
00313 #ifdef WIN32
00314 #ifdef CYGWIN
00315 vsnprintf(str, 10000, format, args);
00316 #else
00317 _vsnprintf(str, 10000, format, args);
00318 #endif
00319 #else
00320 vsnprintf(str, 10000, format, args);
00321 #endif // WIN32
00322 va_end(args);
00323
00324 bool ret = messageSender->log(level, str);
00325 delete [] str;
00326
00327 return ret;
00328 }
00329
00330 bool Messenger::setStatus(const JString& name, const JString& value) {
00331 return messageSender->setStatus(name, value);
00332 }
00333
00334 JString Messenger::getStatus(const JString& name) {
00335 return messageSender->getStatus(name);
00336 }
00337
00338 Message* Messenger::getWakeupMessage() {
00339 if ((!shouldContinue) || (currentMessage == NULL))
00340 return NULL;
00341 if (!isWakeupMessage(currentMessage)) {
00342 return NULL;
00343 }
00344 return currentMessage;
00345 }
00346
00347 Message* Messenger::getTriggerMessage() {
00348 if ((!shouldContinue) || (currentMessage == NULL))
00349 return NULL;
00350
00351 if (!isWakeupMessage(currentMessage)) {
00352 return currentMessage;
00353 }
00354
00355 ObjectDictionary* dict = (ObjectDictionary*) currentMessage->getObject();
00356 if (dict == NULL) {
00357 return NULL;
00358 }
00359
00360 Message* hmsg;
00361 if ( (hmsg = (Message*) dict->get("TriggerMessage")) == NULL ) {
00362 return NULL;
00363 }
00364 return hmsg;
00365 }
00366
00367 int Messenger::getRetrievedMessageCount() {
00368 ObjectCollection* retrievedMessages = getAllRetrievedMessages();
00369 if (retrievedMessages == NULL)
00370 return 0;
00371 else
00372 return retrievedMessages->getCount();
00373 }
00374
00375 ObjectCollection* Messenger::getAllRetrievedMessages() {
00376 if ((!shouldContinue) || (currentMessage == NULL) || (!isWakeupMessage(currentMessage)))
00377 return NULL;
00378
00379 ObjectDictionary* dict = (ObjectDictionary*) currentMessage->getObject();
00380 if (dict == NULL) {
00381 return NULL;
00382 }
00383
00384 ObjectCollection* retrievedMessages = (ObjectCollection*) dict->get("RetrievedMessages");
00385 if (retrievedMessages == NULL) {
00386 return NULL;
00387 }
00388 return retrievedMessages;
00389 }
00390
00391 Message* Messenger::getRetrievedMessage(int pos) {
00392 ObjectCollection* retrievedMessages = getAllRetrievedMessages();
00393 if (retrievedMessages != NULL) {
00394 return (Message*) retrievedMessages->get(pos);
00395 }
00396 else
00397 return NULL;
00398 }
00399
00400 PhaseSpec* Messenger::getPhaseSpec() {
00401
00402
00403 if (!shouldContinue)
00404 return NULL;
00405
00406 return currentPhaseSpec;
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417 }
00418
00419 ObjectCollection* Messenger::retrieveMessages(const JString& retrieveFrom, const JString& retrieveSpecXML) {
00420 ObjectCollection* col = NULL;
00421 Message* msg;
00422 if (retrieveSpecXML.startsWithIgnoreCase("<retrieves"))
00423 msg = new Message(name, retrieveFrom, "AIR.Retrieves", retrieveSpecXML, "xml");
00424 else
00425 msg = new Message(name, retrieveFrom, "AIR.Retrieve", retrieveSpecXML, "xml");
00426 Message* rmsg = messageSender->sendReceiveMsgTo(retrieveFrom, msg);
00427 if ( (rmsg != NULL) && (rmsg->getType().containsIgnoreCase(".Success")) ) {
00428 col = (ObjectCollection*) rmsg->takeObject();
00429 }
00430 delete(rmsg);
00431
00432 if (col == NULL) {
00433 this->messageSender->log(1, "retrieve from '%s' failed...\n", (char*) retrieveFrom);
00434 }
00435 else {
00436
00437 this->messageSender->log(5, "retrieve from '%s' returned %d messages...\n", (char*) retrieveFrom, col->getCount());
00438
00439
00440 }
00441 return col;
00442 }
00443
00444 ObjectCollection* Messenger::retrieveMessages(const JString& retrieveFrom, RetrieveSpec* retrieveSpec) {
00445 ObjectCollection* col = NULL;
00446 Message* msg = new Message(name, retrieveFrom, "AIR.Retrieve", retrieveSpec);
00447 Message* rmsg = messageSender->sendReceiveMsgTo(retrieveFrom, msg);
00448 if ( (rmsg != NULL) && (rmsg->getType().equalsIgnoreCase("AIR.Retrieve.Success")) ) {
00449 col = (ObjectCollection*) rmsg->takeObject();
00450 }
00451 delete(rmsg);
00452 if (col == NULL) {
00453 this->messageSender->log(1, "retrieve from '%s' failed...\n", (char*) retrieveFrom);
00454 }
00455 else {
00456
00457 this->messageSender->log(5, "retrieve from '%s' returned %d messages...\n", (char*) retrieveFrom, col->getCount());
00458
00459
00460 }
00461 return col;
00462 }
00463
00464 ObjectCollection* Messenger::retrieveMessages(const JString& retrieveFrom, const ObjectCollection& retrieveSpecs) {
00465 ObjectCollection* col = NULL;
00466 Message* msg = new Message(name, retrieveFrom, "AIR.Retrieves", retrieveSpecs.clone());
00467 Message* rmsg = messageSender->sendReceiveMsgTo(retrieveFrom, msg);
00468 if ( (rmsg != NULL) && (rmsg->getType().equalsIgnoreCase("AIR.Retrieves.Success")) ) {
00469 col = (ObjectCollection*) rmsg->takeObject();
00470 }
00471 delete(rmsg);
00472 if (col == NULL) {
00473 this->messageSender->log(1, "retrieve from '%s' failed...\n", (char*) retrieveFrom);
00474 }
00475 else {
00476
00477 this->messageSender->log(5, "retrieve from '%s' returned %d messages...\n", (char*) retrieveFrom, col->getCount());
00478
00479
00480 }
00481 return col;
00482 }
00483
00484
00485 Collection Messenger::getParameterNames() {
00486 return parameters.getAllKeys();
00487 }
00488
00489 Collection Messenger::getStreamNames() {
00490 return streams.getAllKeys();
00491 }
00492
00493 MediaConnection* Messenger::getStreamConnection(const JString& name) {
00494 return (MediaConnection*) streams.get(name);
00495 }
00496
00497 Parameter* Messenger::getParameter(const JString& name) {
00498 return (Parameter*) parameters.get(name);
00499 }
00500
00501
00502
00503
00504
00505
00506 bool Messenger::addOutputMessage(Message* msg) {
00507 return outputMessages.add(msg);
00508 }
00509
00510 bool Messenger::addOutputMessage(Message* msg, JString destination) {
00511
00512 if (!phaseMutex.enterMutex(500))
00513 return false;
00514
00515 PhaseSpec* spec = getPhaseSpec();
00516 if (spec == NULL) {
00517 phaseMutex.leaveMutex();
00518 return outputMessages.add(msg);
00519 }
00520
00521 PostSpec* post;
00522 for (int n=0; n<spec->posts->getCount(); n++) {
00523 if ( (post = (PostSpec*) spec->posts->get(n)) != NULL) {
00524 if (post->alias.equalsIgnoreCase(destination)) {
00525 msg->setType(post->type);
00526 phaseMutex.leaveMutex();
00527 return outputMessages.add(msg);
00528 }
00529 }
00530 }
00531
00532 msg->setType(destination);
00533 phaseMutex.leaveMutex();
00534 return outputMessages.add(msg);
00535 }
00536
00537 bool Messenger::sendOutputMessages() {
00538 if (messageSender == NULL)
00539 return false;
00540
00541 if (!phaseMutex.enterMutex(500))
00542 return false;
00543
00544 PhaseSpec* spec = getPhaseSpec();
00545 if (spec == NULL) {
00546 phaseMutex.leaveMutex();
00547 return false;
00548 }
00549
00550 Message* triggerMsg = getTriggerMessage();
00551
00552
00553
00554 Message* msg;
00555 Message* msgCopy;
00556 ObjectCollection destinations;
00557 destinations.noDelete();
00558 PostSpec* post;
00559 Reference* ref;
00560 ObjectDictionary* dict;
00561 ObjectCollection* col;
00562 JString str;
00563 JString type, posttype, phaseChange;
00564 int n;
00565
00566 while ( (msg = (Message*) outputMessages.getFirst()) != NULL) {
00567 outputMessages.removeNoDelete(0);
00568 destinations.removeAll();
00569 if (msg->getTo().length() > 0) {
00570 msg->setFrom(getName());
00571 msg->setPostedTime(JTime());
00572 if (msg->priority == 0)
00573 msg->priority = priority;
00574 if (perfStat != NULL) {
00575 perfStat->msgPosted++;
00576 if (msg->stored.length() == 0)
00577 msg->stored = msg->to;
00578 perfStat->setLastMessagePost(msg);
00579 }
00580
00581 str = JString::format("message from '%s' to '%s' type '%s'", (char*) msg->getFrom(), (char*) msg->getTo(), (char*) msg->getType());
00582 if (!messageSender->sendMsgTo(msg->getTo(), msg)) {
00583 messageSender->log(1, "Posting of %s failed.\n", (char*) str);
00584 }
00585 else {
00586
00587 messageSender->log(2, "Posted %s...\n", (char*) str);
00588
00589
00590 }
00591 }
00592 else {
00593 type = msg->getType();
00594 for (n=0; n<spec->posts->getCount(); n++) {
00595 if ( (post = (PostSpec*) spec->posts->get(n)) != NULL) {
00596 if ( (type.length() > 0) &&
00597 (post->type.print().length() > 0) &&
00598 (!post->type.isPartOf(type)) && (!type.equalsIgnoreCase(post->alias)) ) {
00599
00600 }
00601 else {
00602 destinations.add(post);
00603 if ( (post->phasechange.length() > 0) && (phaseChange.length() == 0) )
00604 phaseChange = post->to;
00605 if (type.equalsIgnoreCase(post->alias))
00606 msg->setType(post->type.print());
00607 }
00608 }
00609 }
00610 if (destinations.getCount() == 0) {
00611 for (n=0; n<spec->posts->getCount(); n++) {
00612 if ( (post = (PostSpec*) spec->posts->get(n)) != NULL) {
00613 posttype = post->type.print();
00614 if ( (type.length() > 0) && posttype.containsIgnoreCase(type) ) {
00615 destinations.add(post);
00616 if ( (post->phasechange.length() > 0) && (phaseChange.length() == 0) )
00617 phaseChange = post->to;
00618 msg->setType(posttype);
00619 if (msg->priority == 0)
00620 msg->priority = post->priority;
00621 break;
00622 }
00623 }
00624 }
00625 }
00626 if (destinations.getCount() == 0) {
00627 delete(msg);
00628 }
00629 else {
00630 if (phaseChange.length() > 0)
00631 sendPhaseChange(phaseChange);
00632 for (int k=0; k < destinations.getCount(); k++) {
00633 if ( (post = (PostSpec*) destinations.get(k)) != NULL ) {
00634 if (k < destinations.getCount() - 1)
00635 msgCopy = (Message*) msg->clone();
00636 else
00637 msgCopy = msg;
00638
00639
00640
00641
00642
00643 if (post->from.length() > 0)
00644 msgCopy->setFrom(post->from);
00645 else
00646 msgCopy->setFrom(getName());
00647 if (msgCopy->getType().length() == 0)
00648 msgCopy->setType(post->type);
00649
00650 if (post->object != NULL)
00651 msgCopy->setObject(post->object->clone());
00652 else if (post->content.length() > 0)
00653 msgCopy->setContentNoParse(post->content, post->language);
00654 msgCopy->setTo(post->to);
00655 if (post->cc.length() > 0)
00656 msgCopy->setCC(post->cc);
00657 msgCopy->setPostedTime(JTime());
00658 if (msg->priority == 0)
00659 msg->priority = post->priority;
00660
00661 if (msgCopy->inReplyTo == NULL) {
00662 if ( (triggerMsg != NULL) && (post->useInReplyTo) && (triggerMsg->inReplyTo != NULL))
00663 msgCopy->setInReplyTo((Reference*)triggerMsg->inReplyTo->clone());
00664 else if ( (triggerMsg != NULL) && (post->useInReplyTo) && (triggerMsg->inReplyTo == NULL))
00665 msgCopy->setInReplyTo(new Reference(triggerMsg));
00666 }
00667
00668 if (perfStat != NULL) {
00669 perfStat->msgPosted++;
00670 if (msgCopy->stored.length() == 0)
00671 msgCopy->stored = msgCopy->to;
00672 perfStat->setLastMessagePost(msg);
00673 }
00674
00675 str = JString::format("message from '%s' to '%s' type '%s'", (char*) msg->getFrom(), (char*) msgCopy->getTo(), (char*) msgCopy->getType());
00676 ref = new Reference(msgCopy);
00677 if (post->noreply)
00678 msgCopy->noreply = "Yes";
00679 if (!messageSender->sendMsgTo(msgCopy->getTo(), msgCopy)) {
00680 delete(ref);
00681 messageSender->log(1, "Posting of %s failed.\n", (char*) str);
00682 }
00683 else {
00684 messageSender->log(2, "Posted %s...\n", (char*) str);
00685 if (recentMessages != NULL) {
00686 if ( (dict = (ObjectDictionary*) recentMessages->getLast()) == NULL)
00687 recentMessages->add(new Message(), (dict = new ObjectDictionary()));
00688 if ( (col = (ObjectCollection*) dict->get("Posted")) == NULL )
00689 dict->put("Posted", (col = new ObjectCollection()));
00690 col->add(ref);
00691 if (col->getCount() > 10)
00692 col->removeFirst();
00693 }
00694 else {
00695 delete(ref);
00696 }
00697 }
00698 }
00699 }
00700 }
00701 }
00702 }
00703 phaseMutex.leaveMutex();
00704 return true;
00705 }
00706
00707 bool Messenger::sendPhaseChange(const JString& dest) {
00708
00709 if (messageSender == NULL)
00710 return false;
00711
00712 if (!phaseMutex.enterMutex(500))
00713 return false;
00714
00715 PhaseSpec* spec = getPhaseSpec();
00716 if (spec == NULL) {
00717 phaseMutex.leaveMutex();
00718 return false;
00719 }
00720
00721
00722 PostSpec* post;
00723 if (dest.length() > 0) {
00724
00725 for (post = (PostSpec*) spec->posts->getFirst(); post != NULL; post = (PostSpec*) spec->posts->getNext()) {
00726 if (post->to.equalsIgnoreCase(dest))
00727 break;
00728 }
00729 }
00730 else {
00731
00732 for (post = (PostSpec*) spec->posts->getFirst(); post != NULL; post = (PostSpec*) spec->posts->getNext()) {
00733 if (post->phasechange.equalsIgnoreCase("yes"))
00734 break;
00735 }
00736
00737 if (post == NULL)
00738 post = (PostSpec*) spec->posts->getFirst();
00739 }
00740
00741 if (post == NULL) {
00742 phaseMutex.leaveMutex();
00743 return false;
00744 }
00745
00746
00747 JString to = post->to;
00748
00749 Message* msg = new Message(getName(), to, phaseChangeType);
00750 msg->setPostedTime(JTime());
00751
00752 if (perfStat != NULL) {
00753 perfStat->msgPosted++;
00754 if (msg->stored.length() == 0)
00755 msg->stored = to;
00756 perfStat->setLastMessagePost(msg);
00757 }
00758 JString str = JString::format("message from '%s' to '%s' type '%s'", (char*) msg->getFrom(), (char*) to, (char*) msg->getType());
00759
00760 Reference* ref = new Reference(msg);
00761 ObjectDictionary* dict;
00762 ObjectCollection* col;
00763
00764 if (!messageSender->sendMsgTo(to, msg)) {
00765 messageSender->log(1, "Posting of %s failed.\n", (char*) str);
00766
00767 phaseMutex.leaveMutex();
00768 delete(ref);
00769 return false;
00770 }
00771 else {
00772 messageSender->log(2, "Posted %s...\n", (char*) str);
00773 if (recentMessages != NULL) {
00774 if ( (dict = (ObjectDictionary*) recentMessages->getLast()) == NULL)
00775 recentMessages->add(new Message(), (dict = new ObjectDictionary()));
00776 if ( (col = (ObjectCollection*) dict->get("Posted")) == NULL )
00777 dict->put("Posted", (col = new ObjectCollection()));
00778 col->add(ref);
00779 if (col->getCount() > 10)
00780 col->removeFirst();
00781 }
00782 else
00783 delete(ref);
00784 }
00785
00786
00787 phaseMutex.leaveMutex();
00788 return true;
00789 }
00790
00791 JTime Messenger::getCurrentContextStart() {
00792 return getContextStart(getCurrentContextName());
00793 }
00794
00795 JTime Messenger::getContextStart(const JString& context) {
00796 JTime t;
00797 t.setInvalid();
00798 PhaseSpec* spec = getPhaseSpec();
00799 if (spec == NULL)
00800 return t;
00801 if (spec->currentContexts == NULL)
00802 return t;
00803 JTime* tp = (JTime*) spec->currentContexts->get(context);
00804 if (tp != NULL)
00805 return *tp;
00806 else
00807 return t;
00808 }
00809
00810 JTime Messenger::getCurrentPhaseStart() {
00811 JTime t;
00812 t.setInvalid();
00813 PhaseSpec* spec = getPhaseSpec();
00814 if (spec == NULL)
00815 return t;
00816 return spec->activationTime;
00817 }
00818
00819
00820 JString Messenger::getCurrentContextName() {
00821 PhaseSpec* spec = getPhaseSpec();
00822 if (spec == NULL)
00823 return "";
00824 if (spec->currentContexts == NULL)
00825 return spec->context;
00826 JString root = spec->context.split(".").getFirst();
00827 if (!root.equals(spec->context))
00828 root += ".";
00829 JString key;
00830 for (key = spec->currentContexts->getFirstKey(); key.length() > 0; key = spec->currentContexts->getNextKey()) {
00831 if (key.startsWithIgnoreCase(root))
00832 return key;
00833 }
00834 return spec->context;
00835 }
00836
00837
00838 JString Messenger::getCurrentPhaseName() {
00839 PhaseSpec* spec = getPhaseSpec();
00840 if (spec == NULL)
00841 return "";
00842 return spec->name;
00843 }
00844
00845 bool Messenger::clearOldMessagesFromQueue(JTime stamp) {
00846 if (!stamp.isValid())
00847 return false;
00848 JString id;
00849 ObjectDictionary* dict;
00850 PhaseSpec* spec;
00851 Message* hmsg = NULL;
00852 if (inQueue.getCount() > 0) {
00853 if (inQueueMutex.EnterMutex(100)) {
00854 for (hmsg = (Message*) inQueue.getFirst(); hmsg != NULL; hmsg = (Message*) inQueue.getNext()) {
00855 dict = (ObjectDictionary*) hmsg->getObject();
00856 if (dict != NULL) {
00857 spec = (PhaseSpec*) dict->get("PhaseSpec");
00858 if (spec != NULL) {
00859 if ( (!spec->equals(currentPhaseSpec)) && (spec->activationTime.isValid()) && (spec->activationTime < stamp) ) {
00860 inQueue.remove(hmsg);
00861 delete(hmsg);
00862 }
00863 }
00864 }
00865 }
00866 inQueueMutex.LeaveMutex();
00867 }
00868 else
00869 return false;
00870 }
00871 return true;
00872 }
00873
00874 Message* Messenger::getNextMessage(long ms) {
00875 JString id;
00876 Message* hmsg = NULL;
00877 if (inQueue.getCount() == 0) {
00878 JTime t;
00879 if (inQueueSemaphore.wait(ms)) {
00880 if (inQueueMutex.EnterMutex(ms)) {
00881 if ( (hmsg = (Message*) inQueue.getFirst()) != NULL)
00882 inQueue.removeFirst();
00883 inQueueMutex.LeaveMutex();
00884 }
00885 }
00886
00887
00888
00889
00890
00891 return hmsg;
00892 }
00893 else {
00894
00895 inQueueSemaphore.wait(0);
00896 if (inQueueMutex.EnterMutex(ms)) {
00897 if ( (hmsg = (Message*) inQueue.getFirst()) != NULL)
00898 inQueue.removeFirst();
00899 inQueueMutex.LeaveMutex();
00900 }
00901 return hmsg;
00902 }
00903 }
00904
00905
00906
00907 bool Messenger::addInputMessage(Message* msg) {
00908
00909 if (inQueueMutex.EnterMutex(3000)) {
00910 inQueue.add(msg);
00911 inQueueMutex.LeaveMutex();
00912 inQueueSemaphore.post();
00913 return true;
00914 }
00915 else {
00916 return false;
00917 }
00918 }
00919
00920 bool Messenger::addParameter(const JString& name, Parameter* parameter) {
00921 return parameters.put(name, parameter);
00922 }
00923
00924 bool Messenger::addStream(const JString& name, MediaConnection* media) {
00925 return streams.put(name, media);
00926 }
00927
00928 bool Messenger::clearStreams() {
00929 streams.removeAllNoDelete();
00930
00931
00932
00933
00934
00935 return true;
00936 }
00937
00938 bool Messenger::terminate() {
00939 if (!mutex.EnterMutex(1000))
00940 return false;
00941 shouldContinue = false;
00942 clearStreams();
00943 mutex.LeaveMutex();
00944 return true;
00945 }
00946
00947
00948
00949
00950 MediaConnection* Messenger::connectToMediaServer(JString mediaName) {
00951 TCPLocation loc = messageSender->resolve(mediaName);
00952 if (!loc.isValid()) {
00953
00954 return NULL;
00955 }
00956
00957 MediaConnection* mediacon = new MediaConnection(getName(), loc);
00958 if (!mediacon->init()) {
00959
00960 delete(mediacon);
00961 return NULL;
00962 }
00963
00964 return mediacon;
00965 }
00966
00967
00968 bool Messenger::reconnectToMediaServer(JString mediaName, MediaConnection* mediacon) {
00969 if (mediacon == NULL)
00970 return false;
00971
00972 TCPLocation loc = messageSender->resolve(mediaName);
00973 if (!loc.isValid())
00974 return false;
00975
00976 if (!mediacon->reinit(getName(), loc)) {
00977 return false;
00978 }
00979
00980 return true;
00981 }
00982
00983
00984 JString Messenger::getServerID() {
00985 JString id;
00986 Message* send = new Message(getName(), "AIRServer", "GET_ID");
00987 Message* recv = messageSender->sendReceiveMsgTo("CNS", send);
00988 if ( (recv != NULL) && (recv->type.equals("ID")) ) {
00989 id = recv->getContent();
00990 }
00991 return id;
00992 }
00993
00994
00995 JString Messenger::getServerName() {
00996 JString name;
00997 Message* send = new Message(getName(), "AIRServer", "GET_NAME");
00998 Message* recv = messageSender->sendReceiveMsgTo("CNS", send);
00999 if ( (recv != NULL) && (recv->type.equals("NAME")) ) {
01000 name = recv->getContent();
01001 }
01002 return name;
01003 }
01004
01005
01006
01007
01008
01009 bool Messenger::postOutputMessage(Message* msg) {
01010 addOutputMessage(msg);
01011 return sendOutputMessages();
01012 }
01013
01014 bool Messenger::postMessageToAllDestinations(Message* msg) {
01015 if (msg == NULL)
01016 return false;
01017
01018 if (msg->priority == 0)
01019 msg->priority = priority;
01020 Message* hmsg;
01021 Collection destinations = getDestinations();
01022 int count = destinations.getCount();
01023 for (int n=0; n<count; n++) {
01024 if (n == count - 1)
01025 hmsg = msg;
01026 else
01027 hmsg = (Message*) msg->clone();
01028 hmsg->setTo(destinations.get(n));
01029 messageSender->sendMsgTo(destinations.get(n), hmsg);
01030 }
01031 return true;
01032 }
01033
01034
01035 JString Messenger::getParameterString(JString module, JString parameter) {
01036 JString str = getParameter(module, parameter);
01037 if (str.startsWith("PARAM_ERROR"))
01038 return "";
01039 else
01040 return str;
01041 }
01042
01043 int Messenger::getParameterInteger(JString module, JString parameter) {
01044 JString str = getParameter(module, parameter);
01045 if (str.startsWith("PARAM_ERROR"))
01046 return PARAM_ERROR;
01047 else
01048 return str.toInt();
01049 }
01050
01051 double Messenger::getParameterDouble(JString module, JString parameter) {
01052 JString str = getParameter(module, parameter);
01053 if (str.startsWith("PARAM_ERROR"))
01054 return PARAM_ERROR;
01055 else
01056 return str.toDouble();
01057 }
01058
01059 bool Messenger::setParameterString(JString module, JString parameter, JString value) {
01060 return setParameter(module, parameter, value);
01061 }
01062
01063 bool Messenger::setParameterInteger(JString module, JString parameter, int value) {
01064 return setParameter(module, parameter, JString(value));
01065 }
01066
01067 bool Messenger::setParameterDouble(JString module, JString parameter, double value) {
01068 return setParameter(module, parameter, JString(value));
01069 }
01070
01071
01072 ObjectDictionary* Messenger::getParameterSpecs(JString module) {
01073 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_GET_ALL_SPECS"));
01074 if ((msg != NULL) && (msg->getType().equals("PARAM_GET_ALL_SPECS_SUCCESS"))) {
01075 ObjectDictionary* content = (ObjectDictionary*) msg->takeObject();
01076 delete(msg);
01077 return content;
01078 }
01079 else {
01080 delete(msg);
01081 return NULL;
01082 }
01083 }
01084
01085 Parameter* Messenger::getParameterSpec(JString module, JString parameter) {
01086 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_GET_SPEC", parameter, ""));
01087 if ((msg != NULL) && (msg->getType().equals("PARAM_GET_SPEC_SUCCESS"))) {
01088 Parameter* content = (Parameter*) msg->takeObject();
01089 delete(msg);
01090 return content;
01091 }
01092 else {
01093 delete(msg);
01094 return NULL;
01095 }
01096 }
01097
01098 JString Messenger::getParameter(JString module, JString parameter) {
01099 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_GET", parameter, ""));
01100 if ((msg != NULL) && (msg->getType().equals("PARAM_GET_SUCCESS"))) {
01101 JString content = msg->getContent();
01102 delete(msg);
01103 return content;
01104 }
01105 else if (msg != NULL) {
01106 delete(msg);
01107 return JString::format("PARAM_ERROR: %s", (char*) msg->getType());
01108 }
01109 else {
01110 return "PARAM_ERROR: Unable to connect to parameter";
01111 }
01112 }
01113
01114 bool Messenger::setParameter(JString module, JString parameter, JString value) {
01115
01116
01117 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_SET", JString::format("%s=%s", (char*)parameter, (char*)value), ""));
01118 if ((msg != NULL) && (msg->getType().equals("PARAM_SET_SUCCESS"))) {
01119 value = msg->getContent();
01120
01121
01122
01123
01124 delete(msg);
01125
01126 return true;
01127 }
01128 else {
01129 if (msg == NULL) {
01130 value = "No Response";
01131 }
01132 else {
01133 value = msg->getType();
01134 }
01135 delete(msg);
01136
01137
01138 return false;
01139 }
01140 }
01141
01142 bool Messenger::increaseParameter(JString module, JString parameter, int steps) {
01143 JString value = JString(steps);
01144
01145
01146 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_INCREMENT", JString::format("%s=%s", (char*)parameter, (char*)value), ""));
01147 if ((msg != NULL) && (msg->getType().equals("PARAM_INCREMENT_SUCCESS"))) {
01148 value = msg->getContent();
01149
01150
01151 delete(msg);
01152
01153 return true;
01154 }
01155 else {
01156 if (msg == NULL) {
01157 value = "No Response";
01158 }
01159 else {
01160 value = msg->getType();
01161 }
01162 delete(msg);
01163
01164
01165 return false;
01166 }
01167 }
01168
01169 bool Messenger::decreaseParameter(JString module, JString parameter, int steps) {
01170 JString value = JString(steps);
01171
01172
01173 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_DECREMENT", JString::format("%s=%s", (char*)parameter, (char*)value), ""));
01174 if ((msg != NULL) && (msg->getType().equals("PARAM_DECREMENT_SUCCESS"))) {
01175 value = msg->getContent();
01176
01177
01178 delete(msg);
01179
01180 return true;
01181 }
01182 else {
01183 if (msg == NULL) {
01184 value = "No Response";
01185 }
01186 else {
01187 value = msg->getType();
01188 }
01189 delete(msg);
01190
01191
01192 return false;
01193 }
01194 }
01195
01196 bool Messenger::addParameterItem(JString module, JString parameter, JString value) {
01197
01198
01199 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_ADD_ITEM", JString::format("%s=%s", (char*)parameter, (char*)value), ""));
01200 if ((msg != NULL) && (msg->getType().equals("PARAM_ADD_ITEM_SUCCESS"))) {
01201 value = msg->getContent();
01202
01203
01204
01205
01206 delete(msg);
01207
01208 return true;
01209 }
01210 else {
01211 if (msg == NULL) {
01212 value = "No Response";
01213 }
01214 else {
01215 value = msg->getType();
01216 }
01217 delete(msg);
01218
01219
01220 return false;
01221 }
01222 }
01223
01224 bool Messenger::removeParameterItem(JString module, JString parameter, JString value) {
01225
01226
01227 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_REMOVE_ITEM", JString::format("%s=%s", (char*)parameter, (char*)value), ""));
01228 if ((msg != NULL) && (msg->getType().equals("PARAM_REMOVE_ITEM_SUCCESS"))) {
01229 value = msg->getContent();
01230
01231
01232
01233
01234 delete(msg);
01235
01236 return true;
01237 }
01238 else {
01239 if (msg == NULL) {
01240 value = "No Response";
01241 }
01242 else {
01243 value = msg->getType();
01244 }
01245 delete(msg);
01246
01247
01248 return false;
01249 }
01250 }
01251
01252 bool Messenger::resetParameter(JString module, JString parameter) {
01253 JString value;
01254
01255
01256 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PARAM_RESET", parameter, ""));
01257 if ((msg != NULL) && (msg->getType().equals("PARAM_RESET_SUCCESS"))) {
01258 value = msg->getContent();
01259
01260
01261 delete(msg);
01262
01263 return true;
01264 }
01265 else {
01266 if (msg == NULL) {
01267 value = "No Response";
01268 }
01269 else {
01270 value = msg->getType();
01271 }
01272 delete(msg);
01273
01274
01275 return false;
01276 }
01277 }
01278
01279 bool Messenger::ping(JString module) {
01280 Message* msg = messageSender->sendReceiveMsgTo(module, new Message(getName(), module, "PING"));
01281 JString type = msg->getType();
01282 delete(msg);
01283 if (type.equals("PING_SUCCESS"))
01284 return true;
01285 else
01286 return false;
01287 }
01288
01289 TCPLocation Messenger::resolve(const JString& name, bool forceCNSLookup) {
01290 return messageSender->resolve(name, forceCNSLookup);
01291 }
01292
01293 bool Messenger::addParameter(JString paramname, JString xml) {
01294 Parameter* parameter = new Parameter(xml);
01295 if ((parameter->name.length() == 0) || (!parameter->name.equals(paramname))) {
01296 delete(parameter);
01297 return false;
01298 }
01299 return parameters.put(parameter->name, parameter);
01300 }
01301
01302 bool Messenger::hasParameter(JString name) {
01303 return parameters.contains(name);
01304 }
01305
01306 JString Messenger::getParameterString(JString param) {
01307 Parameter* parameter = getParameter(param);
01308 if (parameter == NULL)
01309 return "";
01310 return parameter->getString();
01311 }
01312
01313 int Messenger::getParameterInteger(JString param) {
01314 Parameter* parameter = getParameter(param);
01315 if (parameter == NULL)
01316 return PARAM_ERROR;
01317 return parameter->getInteger();
01318 }
01319
01320 double Messenger::getParameterDouble(JString param) {
01321 Parameter* parameter = getParameter(param);
01322 if (parameter == NULL)
01323 return PARAM_ERROR;
01324 return parameter->getDouble();
01325 }
01326
01327 bool Messenger::setParameterString(JString param, JString value) {
01328 Parameter* parameter = getParameter(param);
01329 if (parameter == NULL)
01330 return false;
01331 return parameter->setString(value);
01332 }
01333
01334 bool Messenger::setParameterInteger(JString param, int value) {
01335 Parameter* parameter = getParameter(param);
01336 if (parameter == NULL)
01337 return false;
01338 return parameter->setInteger(value);
01339 }
01340
01341 bool Messenger::setParameterDouble(JString param, double value) {
01342 Parameter* parameter = getParameter(param);
01343 if (parameter == NULL)
01344 return false;
01345 return parameter->setDouble(value);
01346 }
01347
01348
01349 bool Messenger::addParameterItem(JString param, JString value) {
01350 Parameter* parameter = getParameter(param);
01351 if (parameter == NULL)
01352 return false;
01353 return parameter->addItem(value);
01354 }
01355
01356 bool Messenger::removeParameterItem(JString param, JString value) {
01357 Parameter* parameter = getParameter(param);
01358 if (parameter == NULL)
01359 return false;
01360 return parameter->removeItem(value);
01361 }
01362
01363 Collection Messenger::getParameterItems(JString param) {
01364 Parameter* parameter = getParameter(param);
01365 if (parameter == NULL)
01366 return Collection();
01367 return parameter->values;
01368 }
01369
01370 InfoItem* Messenger::queryCatalog(const JString& catalog, Message* query) {
01371
01372 if (query == NULL)
01373 return NULL;
01374
01375 if (query->from.length() == 0)
01376 query->from = getName();
01377 query->setTo(catalog);
01378 Message* rmsg = messageSender->sendReceiveMsgTo(catalog, query);
01379 InfoItem* answer = NULL;
01380 if (rmsg != NULL) {
01381 answer = (InfoItem*) rmsg->takeObject();
01382 }
01383 delete(rmsg);
01384 return answer;
01385 }
01386
01387 JString Messenger::getConfigXML(const JString& module) {
01388
01389
01390 JString entry = module;
01391 if (entry.length() == 0) entry = name;
01392
01393 Message* send = new Message(name, "Config", "GET_CONFIGXML", entry, "");
01394 Message* recv = messageSender->sendReceiveMsgTo("Config", send);
01395
01396 JString type = recv->getType();
01397 JString content = recv->getContent();
01398 delete(recv);
01399 if (type.equals("CONFIGXML"))
01400 return content;
01401 else
01402 return "";
01403 }
01404
01405 bool Messenger::setSignalHandler(bool (*handler)(const JString& module, const JString& signal, const JString& content)) {
01406 signalHandler = handler;
01407 return true;
01408 }
01409
01410 bool Messenger::handleSignal(const JString& module, const JString& signal, const JString& content) {
01411 if (signalHandler == NULL)
01412 return false;
01413 return signalHandler(module, signal, content);
01414 }
01415
01416 }