00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #if !defined(_MEDIACONNECTION_H_)
00028 #define _MEDIACONNECTION_H_
00029
00030 #include "MediaServer.h"
00031 #include "InfoItem.h"
00032 #include "ObjectQueue.h"
00033 #include "CommunicationRecorder.h"
00034
00035 namespace cmlabs {
00036
00037 class PreFetchRequest : public Object {
00038 public:
00039 PreFetchRequest(const JTime& t, const JString& command = "", int ms = 0);
00040 PreFetchRequest(const JString& id, const JString& command = "", int ms = 0);
00041 Object* clone() const;
00042 bool equals(const Object* obj) const;
00043 int getTimeout();
00044 JTime requestTime;
00045 JTime time;
00046 JString id;
00047 JString cmd;
00048 int timeout;
00049 };
00050
00051
00052 class MediaConnection : public Object, public JThread, public TCPReceiver
00053 {
00054 public:
00055 MediaConnection(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize = -1, long softBufferMaxSize = -1);
00056 virtual ~MediaConnection();
00057 Object* clone() const;
00058
00059 bool init();
00060 bool initWithTraining();
00061 bool reinit(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize = -1, long softBufferMaxSize = -1);
00062 bool reinitWithTraining(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize = -1, long softBufferMaxSize = -1);
00063 void run();
00064 JString getServerName();
00065 JString getName();
00066 Message* netObjectReceive(Message *msg, NetworkConnection *con);
00067 bool isConnected();
00068 bool reset();
00069
00070 bool allowMessageDropping(bool allow);
00071 bool handleMultipleWriters(bool allow);
00072 bool isInSameExecutable(const TCPLocation& loc);
00073
00074
00075
00076 JTime getOldestTimestamp();
00077 JTime getNewestTimestamp();
00078 long getCount();
00079 long getSize();
00080 ObjectCollection* getDataSampleList();
00081
00082 InfoItem* getInfo();
00083 InfoItem* getInfoAllStats();
00084
00085
00086
00087 DataSample* getOldestDataSample();
00088 DataSample* getNewestDataSample();
00089
00090 DataSample* getDataSample(const JTime& time);
00091
00092 ObjectCollection* getDataSamples(const JTime& t1, const JTime& t2);
00093
00094
00095
00096 bool prefetchDataSamples(const JTime& t1, const JTime& t2);
00097 bool prefetchDataSample(const JTime& t);
00098
00099 bool prefetchLastDataSampleAter(const JTime& t);
00100 bool prefetchFirstDataSampleAter(const JTime& t);
00101
00102 bool hasRequestBeenRequested(PreFetchRequest* req);
00103 bool isRequestPending(PreFetchRequest* req);
00104 bool hasRequestBeenExecuted(PreFetchRequest* req);
00105 bool didRequestSucceed(PreFetchRequest* req);
00106 bool didRequestFail(PreFetchRequest* req);
00107
00108 DataSample* waitForRequestToComplete(PreFetchRequest* req, long ms);
00109
00110 DataSample* waitForFirstSampleAfter(const JTime& time, long ms);
00111 DataSample* waitForLastSampleAfter(const JTime& time, long ms);
00112 DataSample* waitForDataSample(const JTime& time, long ms);
00113
00114 DataSample* getDataSample(const JString& id);
00115 DataSample* waitForFirstSampleAfter(const JString& id, long ms);
00116 DataSample* waitForLastSampleAfter(const JString& id, long ms);
00117
00118
00119 JTime getOldestBufferTimestamp();
00120 JTime getNewestBufferTimestamp();
00121 long getBufferCount();
00122
00123
00124
00125 bool addDataSample(DataSample* sample);
00126
00127 bool addDataSamples(ObjectCollection* samples);
00128
00129
00130 ObjectCollection* askRemoteServerForDataCollection(const JString& question, Object* obj, int timeout = -1);
00131 DataSample* askRemoteServerForData(const JString& question, Object* obj, int timeout = -1);
00132
00133 bool sendSampleToRemoteServer(const JString& question, DataSample* sample);
00134 bool sendSamplesToRemoteServer(const JString& question, ObjectCollection* samples);
00135
00136 JString askRemoteServerForType(const JString& question, int timeout = -1);
00137 JString askRemoteServerForString(const JString& question, int timeout = -1);
00138 Object* askRemoteServerForObject(const JString& question, int timeout = -1);
00139 Message* askRemoteServer(Message* msg, int timeout = -1);
00140
00141 long pingServer();
00142
00143
00144
00145 bool startContinuousBackgroundReceive();
00146 bool stopContinuousBackgroundReceive();
00147 bool isInContinuousReceive();
00148
00149
00150 ConnectionProfile getConnectionProfile();
00151 ConnectionProfile getServerConnectionProfile();
00152
00153
00154
00155 bool createChannel(const JString& name, const JString& fieldname);
00156 bool destroyChannel(const JString& name);
00157 ObjectCollection* searchChannel(const JString& name, double val1, double val2);
00158 bool subscribeChannel(const JString& channel, double val1, double val2);
00159 bool unsubscribeChannel(const JString& channel, double val1, double val2);
00160 bool unsubscribeAllChannels();
00161 bool addSubscriptionEvent(DataSample* sample);
00162 DataSample* waitForSubscriptionEvent(int timeout);
00163 int getSubscriptionEventQueueSize();
00164
00165 CommunicationRecorder* comRecorder;
00166
00167 protected:
00168
00169 NetworkConnection* serverCon;
00170 NetworkConnection* continuousCon;
00171
00172 JString myName;
00173 MediaServer* mediaServer;
00174
00175 bool shouldContinue;
00176 MediaStream* mediaStream;
00177 TCPLocation server;
00178
00179 JSemaphore newDataAvailable;
00180 JMutex connectionAccess;
00181
00182 ObjectQueue requests;
00183 ObjectCollection pendingRequests;
00184 ObjectCollection successRequests;
00185 ObjectCollection failedRequests;
00186
00187 ObjectQueue* eventQueue;
00188 Collection subscriptions;
00189 bool allowMessageDrop;
00190 };
00191
00192 }
00193
00194 #endif //_MEDIACONNECTION_H_
00195