00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "Message.h"
00024 #include "Specs.h"
00025 #include "MediaConnection.h"
00026 #include "InfoItem.h"
00027 #include <stdarg.h>
00028
00029 #ifndef _MESSENGER_H_
00030 #define _MESSENGER_H_
00031
00032 namespace cmlabs {
00033
00034 class Messenger : public Object {
00035 public:
00036 Messenger(const JString& name, MessageSender* sender, const JString& wakeupMessageType, const JString& phaseChangeType);
00037 ~Messenger();
00038 Object* clone() const {return NULL;}
00039
00040
00041
00042 JString getName();
00043
00044 int getTriggerCount();
00045
00046 JString getTriggerAlias();
00047
00048 bool shouldContinueRunning();
00049
00050 Collection getDestinations();
00051
00052 bool isWakeupMessage(Message* msg);
00053
00054 bool log(int level, char *format, ...);
00055
00056 bool setStatus(const JString& name, const JString& value);
00057
00058 JString getStatus(const JString& name);
00059
00060
00061 bool processFirstMessage();
00062
00063 Message* waitForNewMessage(int timeout);
00064
00065 int getInputMessageCount();
00066
00067 int getOutputMessageCount();
00068
00069 Message* getWakeupMessage();
00070
00071 Message* getTriggerMessage();
00072
00073 int getRetrievedMessageCount();
00074
00075 ObjectCollection* getAllRetrievedMessages();
00076
00077 Message* getRetrievedMessage(int pos);
00078
00079 PhaseSpec* getPhaseSpec();
00080
00081 ObjectCollection* retrieveMessages(const JString& retrieveFrom, const JString& retrieveSpecXML);
00082
00083 ObjectCollection* retrieveMessages(const JString& retrieveFrom, RetrieveSpec* retrieveSpec);
00084
00085 ObjectCollection* retrieveMessages(const JString& retrieveFrom, const ObjectCollection& retrieveSpecs);
00086
00087
00088 bool addOutputMessage(Message* msg);
00089
00090 bool addOutputMessage(Message* msg, JString destination);
00091
00092 bool sendOutputMessages();
00093
00094 bool postMessageToAllDestinations(Message* msg);
00095
00096 bool postOutputMessage(Message* msg);
00097
00098 bool sendPhaseChange(const JString& dest = "");
00099
00100
00101 JString getConfigXML(const JString& entry = "");
00102
00103
00104 JString getCurrentContextName();
00105
00106 JString getCurrentPhaseName();
00107
00108 JTime getCurrentContextStart();
00109 JTime getContextStart(const JString& context);
00110 JTime getCurrentPhaseStart();
00111
00112 ObjectDictionary* getParameterSpecs(JString module);
00113 Parameter* getParameterSpec(JString module, JString parameter);
00114
00115 JString getParameterString(JString module, JString parameter);
00116 int getParameterInteger(JString module, JString parameter);
00117 double getParameterDouble(JString module, JString parameter);
00118 JString getParameter(JString module, JString parameter);
00119
00120 bool setParameterString(JString module, JString parameter, JString value);
00121 bool setParameterInteger(JString module, JString parameter, int value);
00122 bool setParameterDouble(JString module, JString parameter, double value);
00123 bool setParameter(JString module, JString parameter, JString value);
00124
00125 bool increaseParameter(JString module, JString parameter, int steps);
00126 bool decreaseParameter(JString module, JString parameter, int steps);
00127 bool resetParameter(JString module, JString parameter);
00128
00129 bool addParameterItem(JString module, JString parameter, JString value);
00130 bool removeParameterItem(JString module, JString parameter, JString value);
00131
00132 bool addParameterItem(JString param, JString value);
00133 bool removeParameterItem(JString param, JString value);
00134 Collection getParameterItems(JString param);
00135
00136
00137 InfoItem* queryCatalog(const JString& catalog, Message* query);
00138
00139
00140 MediaConnection* connectToMediaServer(JString mediaName);
00141
00142 bool reconnectToMediaServer(JString mediaName, MediaConnection* mediacon);
00143
00144 JString getServerID();
00145
00146 JString getServerName();
00147
00148
00149 Collection getParameterNames();
00150
00151 Parameter* getParameter(const JString& name);
00152
00153 Collection getStreamNames();
00154
00155 MediaConnection* getStreamConnection(const JString& name);
00156
00157 bool addParameter(JString paramname, JString xml);
00158 bool hasParameter(JString name);
00159 JString getParameterString(JString param);
00160 int getParameterInteger(JString param);
00161 double getParameterDouble(JString param);
00162
00163 bool setParameterString(JString param, JString value);
00164 bool setParameterInteger(JString param, int value);
00165 bool setParameterDouble(JString param, double value);
00166
00167 bool ping(JString module);
00168 TCPLocation resolve(const JString& name, bool forceCNSLookup = false);
00169
00170
00171 bool addInputMessage(Message* msg);
00172 bool addParameter(const JString& name, Parameter* parameter);
00173 bool addStream(const JString& name, MediaConnection* media);
00174 bool clearStreams();
00175 bool terminate();
00176
00177
00178 bool setSignalHandler(bool (*handler)(const JString& module, const JString& signal, const JString& content));
00179 bool handleSignal(const JString& module, const JString& signal, const JString& content);
00180 bool (*signalHandler)(const JString& module, const JString& signal, const JString& content);
00181
00182 bool processPhaseSpec(PhaseSpec* newSpec, JTime t);
00183
00184 ObjectTable* recentMessages;
00185 PerfStat* perfStat;
00186 int verbose;
00187 double priority;
00188
00189 private:
00190 MessageSender* messageSender;
00191 JString name;
00192 JString wakeupMessageType;
00193 JString phaseChangeType;
00194
00195
00196 ObjectCollection outputMessages;
00197 ObjectDictionary parameters;
00198 ObjectDictionary streams;
00199 Message* currentMessage;
00200 PhaseSpec* currentPhaseSpec;
00201
00202 int totalTriggerCount;
00203 bool isStillRunning;
00204 bool shouldContinue;
00205
00206 JTime startCrankTime;
00207 JTime startProcessTime;
00208 JThread aThread;
00209 JMutex mutex;
00210 JMutex phaseMutex;
00211
00212 SortedObjectCollection inQueue;
00213 JMutex inQueueMutex;
00214 JSemaphore inQueueSemaphore;
00215 Message* getNextMessage(long ms);
00216
00217 bool checkNewPhaseSpec(PhaseSpec* newSpec, JTime msgtime);
00218 bool clearOldMessagesFromQueue(JTime stamp);
00219 };
00220
00221 }
00222
00223 #endif // _MESSENGER_H_
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237