00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "MediaConnection.h"
00022
00023
00024
00025 namespace cmlabs {
00026
00027 PreFetchRequest::PreFetchRequest(const JTime& t, const JString& command, int ms) {
00028 time = t;
00029 cmd = command;
00030 timeout=ms;
00031 }
00032
00033 PreFetchRequest::PreFetchRequest(const JString& id, const JString& command, int ms) {
00034 this->id = id;
00035 cmd = command;
00036 timeout=ms;
00037 }
00038
00039 Object* PreFetchRequest::clone() const {
00040 PreFetchRequest* req = new PreFetchRequest(time, cmd, timeout);
00041 req->requestTime = requestTime;
00042 return req;
00043 }
00044
00045 bool PreFetchRequest::equals(const Object* obj) const {
00046 if (Object::equals(obj))
00047 return true;
00048 if ((obj == NULL) || (!isSameClass(obj)))
00049 return 0;
00050
00051
00052 PreFetchRequest* req = (PreFetchRequest*) obj;
00053 return ( (req->time.equals(&time)) && (req->cmd.equals(cmd)) );
00054 }
00055
00056 int PreFetchRequest::getTimeout() {
00057 JTime now;
00058 int ms = timeout - (now-requestTime);
00059 if (ms < 0)
00060 ms = 50;
00061 return ms;
00062 }
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 MediaConnection::MediaConnection(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize, long softBufferMaxSize) : JThread() {
00078 eventQueue = NULL;
00079 continuousCon = NULL;
00080 serverCon = NULL;
00081 myName = name;
00082 allowMessageDrop = false;
00083
00084
00085 shouldContinue = true;
00086 server = serverlocation;
00087
00088
00089
00090 mediaServer = NULL;
00091 if (isInSameExecutable(server)) {
00092 mediaServer = (MediaServer*) server.pCommunicator;
00093 }
00094 if (mediaServer == NULL) {
00095 mediaStream = new MediaStream(server.name, softBufferMaxSize, hardBufferMaxSize);
00096 }
00097 else {
00098 mediaStream = NULL;
00099 }
00100 comRecorder = NULL;
00101 }
00102
00103 MediaConnection::~MediaConnection() {
00104
00105 comRecorder = NULL;
00106
00107 if (isInContinuousReceive())
00108 stopContinuousBackgroundReceive();
00109
00110 unsubscribeAllChannels();
00111
00112 if (continuousCon != NULL) {
00113 continuousCon->reset();
00114 delete(continuousCon);
00115 continuousCon = NULL;
00116 }
00117
00118 if ((shouldContinue) && (mediaStream != NULL)) {
00119 shouldContinue = false;
00120 wait(100);
00121 }
00122
00123 delete(mediaStream);
00124 delete(serverCon);
00125
00126 delete(eventQueue);
00127 eventQueue = NULL;
00128
00129
00130
00131
00132
00133 }
00134
00135 bool MediaConnection::reinit(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize, long softBufferMaxSize) {
00136
00137 comRecorder = NULL;
00138
00139
00140 if (isInContinuousReceive())
00141 stopContinuousBackgroundReceive();
00142
00143 unsubscribeAllChannels();
00144
00145 if (continuousCon != NULL) {
00146 continuousCon->reset();
00147 delete(continuousCon);
00148 continuousCon = NULL;
00149 }
00150
00151 if ((shouldContinue) && (mediaStream != NULL)) {
00152 shouldContinue = false;
00153 wait(100);
00154 }
00155
00156 delete(mediaStream);
00157 delete(serverCon);
00158 delete(eventQueue);
00159
00160
00161 eventQueue = NULL;
00162 continuousCon = NULL;
00163 serverCon = NULL;
00164 myName = name;
00165 shouldContinue = true;
00166 server = serverlocation;
00167
00168 mediaServer = NULL;
00169 if (isInSameExecutable(server)) {
00170 mediaServer = (MediaServer*) server.pCommunicator;
00171 }
00172 if (mediaServer == NULL) {
00173 mediaStream = new MediaStream(server.name, softBufferMaxSize, hardBufferMaxSize);
00174 }
00175 else {
00176 mediaStream = NULL;
00177 }
00178
00179 return init();
00180 }
00181
00182 bool MediaConnection::reinitWithTraining(const JString& name, const TCPLocation& serverlocation, long hardBufferMaxSize, long softBufferMaxSize) {
00183
00184 comRecorder = NULL;
00185
00186
00187 if (isInContinuousReceive())
00188 stopContinuousBackgroundReceive();
00189
00190 unsubscribeAllChannels();
00191
00192 if (continuousCon != NULL) {
00193 continuousCon->reset();
00194 delete(continuousCon);
00195 continuousCon = NULL;
00196 }
00197
00198 if ((shouldContinue) && (mediaStream != NULL)) {
00199 shouldContinue = false;
00200 wait(100);
00201 }
00202
00203 delete(mediaStream);
00204 delete(serverCon);
00205 delete(eventQueue);
00206
00207
00208 eventQueue = NULL;
00209 continuousCon = NULL;
00210 serverCon = NULL;
00211 myName = name;
00212 shouldContinue = true;
00213 server = serverlocation;
00214
00215 mediaServer = NULL;
00216 if (isInSameExecutable(server)) {
00217 mediaServer = (MediaServer*) server.pCommunicator;
00218 }
00219 if (mediaServer == NULL) {
00220 mediaStream = new MediaStream(server.name, softBufferMaxSize, hardBufferMaxSize);
00221 }
00222 else {
00223 mediaStream = NULL;
00224 }
00225
00226 return initWithTraining();
00227 }
00228
00229 bool MediaConnection::init() {
00230
00231 if (pingServer() < 0) {
00232
00233 return false;
00234 }
00235
00236 if (isInSameExecutable(server)) {
00237
00238 return true;
00239 }
00240
00241 if (serverCon == NULL) {
00242
00243 return false;
00244 }
00245
00246 return true;
00247
00248 }
00249
00250 bool MediaConnection::initWithTraining() {
00251
00252 if (pingServer() < 0) {
00253
00254 return false;
00255 }
00256
00257 if (isInSameExecutable(server)) {
00258
00259 return true;
00260 }
00261
00262 if (serverCon == NULL) {
00263
00264 return false;
00265 }
00266
00267
00268 return serverCon->trainNetworkProfile();
00269 }
00270
00271 bool MediaConnection::isConnected() {
00272 if (isInSameExecutable(server))
00273 return true;
00274 if (serverCon == NULL)
00275 return false;
00276 return serverCon->isConnected();
00277 }
00278
00279
00280 void MediaConnection::run() {
00281
00282
00283 PreFetchRequest* req;
00284 JTime t;
00285
00286 DataSample* sample;
00287 JString str;
00288 int timeout;
00289
00290 int waittime = 50;
00291
00292
00293
00294 while ((shouldContinue) && (mediaStream != NULL)) {
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311 if ( (req = (PreFetchRequest*) requests.waitForNewEntry(waittime)) != NULL ) {
00312
00313 if (req->cmd.length() == 0) {
00314 sample = this->getDataSample(req->time);
00315 if (sample != NULL) {
00316 mediaStream->addSample(sample);
00317 successRequests.add(req);
00318 }
00319 else
00320 failedRequests.add(req);
00321 }
00322 else if (req->cmd.equalsIgnoreCase("FirstAfter")) {
00323 timeout = req->getTimeout();
00324 str = JString::format("WAIT_FOR_FIRST_SAMPLE %d", timeout);
00325 sample = askRemoteServerForData(str, req->time.clone(), timeout);
00326 if (sample != NULL) {
00327 mediaStream->addSample(sample);
00328 successRequests.add(req);
00329 }
00330 else
00331 failedRequests.add(req);
00332 }
00333 else if (req->cmd.equalsIgnoreCase("FirstAfterID")) {
00334 timeout = req->getTimeout();
00335 str = JString::format("WAIT_FOR_FIRST_AFTERID %d", timeout);
00336 sample = askRemoteServerForData(str, req->id.clone(), timeout);
00337 if (sample != NULL) {
00338 mediaStream->addSample(sample);
00339 successRequests.add(req);
00340 }
00341 else
00342 failedRequests.add(req);
00343 }
00344 else if (req->cmd.equalsIgnoreCase("LastAfter")) {
00345 timeout = req->getTimeout();
00346 str = JString::format("WAIT_FOR_LAST_SAMPLE %d", timeout);
00347 sample = askRemoteServerForData(str, req->time.clone(), timeout);
00348 if (sample != NULL) {
00349 mediaStream->addSample(sample);
00350 successRequests.add(req);
00351 }
00352 else
00353 failedRequests.add(req);
00354 }
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369 else
00370 failedRequests.add(req);
00371
00372 pendingRequests.remove(pendingRequests.getPos(req));
00373 while (successRequests.getCount() > 100)
00374 successRequests.removeFirst();
00375 while (failedRequests.getCount() > 100)
00376 failedRequests.removeFirst();
00377
00378 newDataAvailable.post();
00379 }
00380
00381 }
00382 }
00383
00384
00385
00386
00387
00388
00389 bool MediaConnection::reset() {
00390 comRecorder = NULL;
00391
00392 this->stopContinuousBackgroundReceive();
00393 requests.removeAll();
00394 unsubscribeAllChannels();
00395
00396 return true;
00397 }
00398
00399
00400 Object* MediaConnection::clone() const {
00401 return NULL;
00402 }
00403
00404 Message* MediaConnection::netObjectReceive(Message *inMsg, NetworkConnection *con) {
00405
00406 bool noreply = (inMsg->noreply.length() > 0);
00407
00408 Message* outMsg = NULL;
00409 if (!shouldContinue) {
00410 if (!noreply)
00411 outMsg = new Message(myName, inMsg->getFrom(), "RECEIVE_ACCEPT");
00412 delete(inMsg);
00413 return outMsg;
00414 }
00415
00416 DataSample* sample;
00417 if ( (inMsg->getType().equalsIgnoreCase("BinaryOnly")) &&
00418 ( (sample = (DataSample*) inMsg->takeObject()) != NULL ) ) {
00419 if (sample->params.get("SubscriptionEventMatch").length() > 0)
00420 addSubscriptionEvent(sample);
00421 else
00422 mediaStream->addSample(sample);
00423 }
00424 else {
00425
00426 }
00427
00428 if (!noreply)
00429 outMsg = new Message(myName, inMsg->getFrom(), "RECEIVE_ACCEPT");
00430 delete(inMsg);
00431 return outMsg;
00432 }
00433
00434 int MediaConnection::getSubscriptionEventQueueSize() {
00435 if (eventQueue == NULL)
00436 return -1;
00437 return eventQueue->getCount();
00438 }
00439
00440 bool MediaConnection::addSubscriptionEvent(DataSample* sample) {
00441 if (eventQueue == NULL)
00442 eventQueue = new ObjectQueue();
00443
00444 eventQueue->add(sample);
00445 if (eventQueue->getCount() > 50) {
00446 Object* obj = eventQueue->retrieveEntry(0);
00447 delete(obj);
00448 }
00449 return true;
00450 }
00451
00452 DataSample* MediaConnection::waitForSubscriptionEvent(int timeout) {
00453 if (eventQueue == NULL)
00454 eventQueue = new ObjectQueue();
00455 DataSample* sample = (DataSample*) eventQueue->waitForNewEntry(timeout);
00456 if ((sample != NULL) && (mediaStream != NULL))
00457 mediaStream->addSample(sample);
00458 return sample;
00459 }
00460
00461 JString MediaConnection::getName() {
00462 return myName;
00463 }
00464
00465 bool MediaConnection::allowMessageDropping(bool allow) {
00466 allowMessageDrop = allow;
00467 if (serverCon != NULL)
00468 serverCon->allowMessageDropping(allow);
00469 return true;
00470 }
00471
00472 bool MediaConnection::handleMultipleWriters(bool allow) {
00473 if (mediaServer != NULL) {
00474 return mediaServer->handleMultipleWriters(allow);
00475 }
00476 else {
00477 if (allow)
00478 return (askRemoteServerForType("HANDLE_MULTIPLE_WRITERS").equalsIgnoreCase("HANDLE_MULTIPLE_WRITERS_SUCCESS"));
00479 else
00480 return (askRemoteServerForType("HANDLE_SINGLE_WRITER").equalsIgnoreCase("HANDLE_SINGLE_WRITER_SUCCESS"));
00481 }
00482 }
00483
00484
00485 JString MediaConnection::getServerName() {
00486 if (mediaServer != NULL) {
00487 return mediaServer->getName();
00488 }
00489 else {
00490 return askRemoteServerForString("GET_NAME");
00491 }
00492 }
00493
00494 InfoItem* MediaConnection::getInfo() {
00495 if (mediaServer != NULL) {
00496 return mediaServer->getInfo();
00497 }
00498 else {
00499 return (InfoItem*) askRemoteServerForObject("GET_INFO");
00500 }
00501 }
00502
00503 InfoItem* MediaConnection::getInfoAllStats() {
00504 if (mediaServer != NULL) {
00505 return mediaServer->getInfoAllStats();
00506 }
00507 else {
00508 return (InfoItem*) askRemoteServerForObject("GET_INFO_ALL_STATS");
00509 }
00510 }
00511
00512
00513 bool MediaConnection::isInSameExecutable(const TCPLocation& loc) {
00514
00515 JString addr;
00516 if (serverCon != NULL)
00517 addr = serverCon->getLocalIPAddress();
00518 else {
00519 JSocket* sock = new JSocket("", 80);
00520 addr = sock->getLocalIPAddress();
00521 delete(sock);
00522 }
00523 if ((addr.length() > 0) && (loc.addr.length() > 0)) {
00524 if (!addr.equalsIgnoreCase(loc.addr))
00525 return false;
00526 }
00527
00528 JString procid = getProcID();
00529 if (!procid.equalsIgnoreCase(loc.procid))
00530 return false;
00531
00532 return true;
00533 }
00534
00535
00536
00537
00538
00539
00540 JTime MediaConnection::getOldestTimestamp() {
00541
00542 JTime t;
00543
00544 if (mediaServer != NULL) {
00545 return mediaServer->getOldestTimestamp();
00546 }
00547 else {
00548 JString xml = askRemoteServerForString("GET_OLDEST_TIMESTAMP");
00549 if (xml.looksLikeXML())
00550 t = JTime(xml);
00551 else
00552 t.setInvalid();
00553 }
00554 return t;
00555 }
00556
00557 JTime MediaConnection::getNewestTimestamp() {
00558
00559 JTime t;
00560
00561 if (mediaServer != NULL) {
00562 return mediaServer->getNewestTimestamp();
00563 }
00564 else {
00565 JString xml = askRemoteServerForString("GET_NEWEST_TIMESTAMP");
00566 if (xml.looksLikeXML())
00567 t = JTime(xml);
00568 else
00569 t.setInvalid();
00570 }
00571 return t;
00572 }
00573
00574 DataSample* MediaConnection::getOldestDataSample() {
00575
00576 DataSample* sample = NULL;
00577
00578 if (mediaServer != NULL) {
00579 return mediaServer->getOldestSample(myName);
00580 }
00581 else {
00582 sample = askRemoteServerForData("GET_OLDEST_DATASAMPLE", mediaStream->getTimestampList());
00583 if (sample == NULL) {
00584
00585 return mediaStream->getOldestSample();
00586 }
00587 }
00588 return sample;
00589 }
00590
00591 DataSample* MediaConnection::getNewestDataSample() {
00592
00593 DataSample* sample = NULL;
00594
00595 if (mediaServer != NULL) {
00596 return mediaServer->getNewestSample(myName);
00597 }
00598 else {
00599 sample = askRemoteServerForData("GET_NEWEST_DATASAMPLE", mediaStream->getTimestampList());
00600 if (sample == NULL) {
00601
00602 return mediaStream->getNewestSample();
00603 }
00604 }
00605 return sample;
00606 }
00607
00608 long MediaConnection::getCount() {
00609
00610 if (mediaServer != NULL) {
00611 return mediaServer->getCount();
00612 }
00613 else {
00614 JString str = askRemoteServerForString("GET_COUNT");
00615 return str.toLong();
00616 }
00617 }
00618
00619 long MediaConnection::getSize() {
00620
00621 if (mediaServer != NULL) {
00622 return mediaServer->getTotalDataSize();
00623 }
00624 else {
00625 JString str = askRemoteServerForString("GET_SIZE");
00626 return str.toLong();
00627 }
00628 }
00629
00630 ObjectCollection* MediaConnection::getDataSampleList() {
00631
00632 if (mediaServer != NULL) {
00633 return mediaServer->getDataSampleList();
00634 }
00635 else {
00636 ObjectCollection* col = (ObjectCollection*) askRemoteServerForObject("GET_SAMPLE_LIST");
00637 return col;
00638 }
00639 }
00640
00641
00642
00643
00644
00645
00646 DataSample* MediaConnection::getDataSample(const JString& id) {
00647 if (mediaServer != NULL) {
00648 return mediaServer->getDataSample(myName, id);
00649 }
00650 else {
00651 return askRemoteServerForData("GET_SAMPLE_BY_ID", id.clone());
00652 }
00653 }
00654
00655
00656
00657 DataSample* MediaConnection::getDataSample(const JTime& time) {
00658 if (mediaServer != NULL) {
00659 return mediaServer->getDataSample(myName, time);
00660 }
00661 else {
00662 DataSample* sample = this->mediaStream->getSampleAt(time);
00663 if (sample != NULL)
00664 return sample;
00665 sample = askRemoteServerForData("GET_SAMPLE_BY_TIME", time.clone());
00666 if (sample != NULL) {
00667 mediaStream->addSample(sample);
00668 return sample;
00669 }
00670 else
00671 return NULL;
00672 }
00673 }
00674
00675
00676 ObjectCollection* MediaConnection::getDataSamples(const JTime& t1, const JTime& t2) {
00677
00678 if (mediaServer != NULL) {
00679 ObjectCollection* col = mediaServer->getDataSamples(myName, t1, t2);
00680 if (col != NULL)
00681 col->noDelete();
00682 return col;
00683 }
00684 else {
00685 ObjectCollection* col = new ObjectCollection();
00686 col->add(t1.clone());
00687 col->add(t2.clone());
00688 col = askRemoteServerForDataCollection("GET_SAMPLES_BY_TIME", col);
00689 if (col != NULL) {
00690 mediaStream->addSamples(col, false);
00691 col->noDelete();
00692 }
00693 return col;
00694 }
00695 }
00696
00697
00698
00699
00700
00701
00702
00703 bool MediaConnection::addDataSample(DataSample* sample) {
00704
00705 if ((comRecorder != NULL) && (!comRecorder->isPaused()))
00706 comRecorder->addNewSample((DataSample*)sample->clone());
00707
00708 if (mediaServer != NULL) {
00709 return mediaServer->addDataSample(myName, sample);
00710 }
00711 else {
00712 return sendSampleToRemoteServer("ADD_SAMPLE", sample);
00713 }
00714 }
00715
00716
00717 bool MediaConnection::addDataSamples(ObjectCollection* samples) {
00718
00719 if ((comRecorder != NULL) && (samples != NULL) && (!comRecorder->isPaused())) {
00720 for (DataSample* sample = (DataSample*) samples->getFirst(); sample != NULL; sample = (DataSample*) samples->getNext()) {
00721 comRecorder->addNewSample((DataSample*)sample->clone());
00722 }
00723 }
00724
00725 if (mediaServer != NULL) {
00726 return mediaServer->addDataSamples(myName, samples);
00727 }
00728 else {
00729 return sendSamplesToRemoteServer("ADD_SAMPLES", samples);
00730 }
00731 }
00732
00733
00734
00735
00736
00737
00738
00739 JTime MediaConnection::getOldestBufferTimestamp() {
00740
00741 JTime t;
00742 if (mediaServer != NULL) {
00743 return mediaServer->getOldestTimestamp();
00744 }
00745 else if (mediaStream != NULL) {
00746 return mediaStream->getOldestSampleTime();
00747 }
00748 else {
00749 t.setInvalid();
00750 return t;
00751 }
00752 }
00753
00754 JTime MediaConnection::getNewestBufferTimestamp() {
00755
00756 JTime t;
00757 if (mediaServer != NULL) {
00758 return mediaServer->getNewestTimestamp();
00759 }
00760 else if (mediaStream != NULL) {
00761 return mediaStream->getNewestSampleTime();
00762 }
00763 else {
00764 t.setInvalid();
00765 return t;
00766 }
00767 }
00768
00769 long MediaConnection::getBufferCount() {
00770
00771 JTime t;
00772 if (mediaServer != NULL) {
00773 return mediaServer->getCount();
00774 }
00775 else if (mediaStream != NULL) {
00776 return mediaStream->getCount();
00777 }
00778 else {
00779 return 0;
00780 }
00781 }
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792 DataSample* MediaConnection::waitForFirstSampleAfter(const JString& id, long ms) {
00793 if (mediaServer != NULL) {
00794 return mediaServer->waitForFirstSampleAfter(myName, id, ms);
00795 }
00796 else {
00797 DataSample* sample = NULL;
00798 if (isInContinuousReceive()) {
00799 return mediaStream->waitForFirstSampleAfter(id, ms);
00800 }
00801 else {
00802 PreFetchRequest* req = new PreFetchRequest(id, "FirstAfterID", ms);
00803 if (this->isRequestPending(req)) {
00804 sample = waitForRequestToComplete(req, ms);
00805 }
00806 else if (this->didRequestSucceed(req)) {
00807 sample = mediaStream->waitForFirstSampleAfter(id, ms);
00808 }
00809 else {
00810 JString str = JString::format("WAIT_FOR_FIRST_AFTERID %d", ms);
00811 sample = askRemoteServerForData(str, id.clone(), ms);
00812 }
00813 delete(req);
00814 if (sample != NULL)
00815 mediaStream->addSample(sample);
00816 return sample;
00817 }
00818 }
00819 return NULL;
00820 }
00821
00822 DataSample* MediaConnection::waitForLastSampleAfter(const JString& id, long ms) {
00823 if (mediaServer != NULL) {
00824 return mediaServer->waitForLastSampleAfter(myName, id, ms);
00825 }
00826 else {
00827 DataSample* sample = NULL;
00828 if (isInContinuousReceive()) {
00829 return mediaStream->waitForLastSampleAfter(id, ms);
00830 }
00831 else {
00832 PreFetchRequest* req = new PreFetchRequest(id, "LastAfterID", ms);
00833 if (this->isRequestPending(req)) {
00834 sample = waitForRequestToComplete(req, ms);
00835 }
00836 else if (this->didRequestSucceed(req)) {
00837
00838 sample = mediaStream->waitForLastSampleAfter(id, ms);
00839 }
00840 else {
00841 JString str = JString::format("WAIT_FOR_LAST_AFTERID %d", ms);
00842 sample = askRemoteServerForData(str, id.clone(), ms);
00843 }
00844 delete(req);
00845 if (sample != NULL)
00846 mediaStream->addSample(sample);
00847 return sample;
00848 }
00849 }
00850 return NULL;
00851 }
00852
00853
00854 DataSample* MediaConnection::waitForFirstSampleAfter(const JTime& time, long ms) {
00855
00856 if (mediaServer != NULL) {
00857 return mediaServer->waitForFirstSampleAfter(myName, time, ms);
00858 }
00859 else {
00860 DataSample* sample = NULL;
00861 if (isInContinuousReceive()) {
00862 return mediaStream->waitForFirstSampleAfter(time, ms);
00863 }
00864 else {
00865 PreFetchRequest* req = new PreFetchRequest(time, "FirstAfter", ms);
00866 if (this->isRequestPending(req)) {
00867 sample = waitForRequestToComplete(req, ms);
00868 }
00869 else if (this->didRequestSucceed(req)) {
00870 sample = mediaStream->waitForFirstSampleAfter(time, ms);
00871 }
00872 else {
00873 JString str = JString::format("WAIT_FOR_FIRST_SAMPLE %d", ms);
00874 sample = askRemoteServerForData(str, time.clone(), ms);
00875 }
00876 delete(req);
00877 if (sample != NULL)
00878 mediaStream->addSample(sample);
00879 return sample;
00880 }
00881 }
00882 return NULL;
00883 }
00884
00885 DataSample* MediaConnection::waitForLastSampleAfter(const JTime& time, long ms) {
00886
00887 if (mediaServer != NULL) {
00888 return mediaServer->waitForLastSampleAfter(myName, time, ms);
00889 }
00890 else {
00891 DataSample* sample = NULL;
00892 if (isInContinuousReceive()) {
00893 return mediaStream->waitForLastSampleAfter(time, ms);
00894 }
00895 else {
00896 PreFetchRequest* req = new PreFetchRequest(time, "LastAfter", ms);
00897 if (this->isRequestPending(req)) {
00898 sample = waitForRequestToComplete(req, ms);
00899 }
00900 else if (this->didRequestSucceed(req)) {
00901
00902 sample = mediaStream->waitForLastSampleAfter(time, ms);
00903 }
00904 else {
00905 JString str = JString::format("WAIT_FOR_LAST_SAMPLE %d", ms);
00906 sample = askRemoteServerForData(str, time.clone(), ms);
00907 }
00908 delete(req);
00909 if (sample != NULL)
00910 mediaStream->addSample(sample);
00911 return sample;
00912 }
00913 }
00914 return NULL;
00915 }
00916
00917 DataSample* MediaConnection::waitForDataSample(const JTime& time, long ms) {
00918
00919 if (mediaServer != NULL) {
00920 return mediaServer->waitForSampleAt(myName, time, ms);
00921 }
00922
00923 else {
00924 DataSample* sample = NULL;
00925 if (isInContinuousReceive()) {
00926 if ( (sample = mediaStream->getSampleAt(time)) != NULL)
00927 return sample;
00928 JTime start = JTime();
00929 long msgone;
00930 while ( (msgone = start.getAge()) < ms) {
00931 newDataAvailable.wait(ms - msgone);
00932 if ( (sample = mediaStream->getSampleAt(time)) != NULL)
00933 return sample;
00934 }
00935 return mediaStream->getSampleAt(time);
00936 }
00937 else {
00938 PreFetchRequest* req = new PreFetchRequest(time);
00939 if (this->isRequestPending(req)) {
00940 sample = waitForRequestToComplete(req, ms);
00941 }
00942 else if (this->didRequestSucceed(req)) {
00943 sample = mediaStream->getSampleAt(time);
00944 }
00945 else {
00946 if (prefetchDataSample(time))
00947 sample = waitForDataSample(time, ms);
00948 }
00949 delete(req);
00950 if (sample != NULL)
00951 mediaStream->addSample(sample);
00952 return sample;
00953 }
00954 }
00955 return NULL;
00956 }
00957
00958 DataSample* MediaConnection::waitForRequestToComplete(PreFetchRequest* req, long ms) {
00959
00960 DataSample* sample = NULL;
00961
00962 if (this->isRequestPending(req)) {
00963 JTime start;
00964 while (start.getAge() < ms) {
00965 newDataAvailable.wait(ms);
00966 if (didRequestSucceed(req))
00967 break;
00968
00969 }
00970 }
00971
00972 if (this->didRequestSucceed(req)) {
00973 sample = mediaStream->getSampleAt(req->time);
00974 }
00975 else if (this->didRequestFail(req)) {
00976 }
00977
00978 return sample;
00979 }
00980
00981
00982
00983
00984
00985
00986
00987
00988 bool MediaConnection::prefetchLastDataSampleAter(const JTime& t) {
00989
00990 if (mediaServer != NULL) {
00991 return true;
00992 }
00993 else {
00994 PreFetchRequest* req = new PreFetchRequest(t, "LastAfter");
00995 if (isRequestPending(req)) {
00996 delete(req);
00997 return true;
00998 }
00999 requests.add(req->clone());
01000 pendingRequests.add(req);
01001 return true;
01002 }
01003 }
01004
01005 bool MediaConnection::prefetchFirstDataSampleAter(const JTime& t) {
01006
01007 if (mediaServer != NULL) {
01008 return true;
01009 }
01010 else {
01011 PreFetchRequest* req = new PreFetchRequest(t, "FirstAfter");
01012 if (isRequestPending(req)) {
01013 delete(req);
01014 return true;
01015 }
01016 requests.add(req->clone());
01017 pendingRequests.add(req);
01018 return true;
01019 }
01020 }
01021
01022
01023
01024
01025
01026
01027 bool MediaConnection::prefetchDataSamples(const JTime& t1, const JTime& t2) {
01028
01029 if (mediaServer != NULL) {
01030 return true;
01031 }
01032 else {
01033 ObjectCollection* samples = this->getDataSampleList();
01034 DataSample* sample;
01035 for (int n=0; n<samples->getCount(); n++) {
01036 if ( (sample = (DataSample*) samples->get(n)) != NULL ) {
01037 prefetchDataSample(sample->getTimestamp());
01038 }
01039 }
01040 delete(samples);
01041 return true;
01042 }
01043 }
01044
01045
01046 bool MediaConnection::prefetchDataSample(const JTime& t) {
01047
01048 if (mediaServer != NULL) {
01049 return true;
01050 }
01051 else {
01052 PreFetchRequest* req = new PreFetchRequest(t);
01053 if (isRequestPending(req)) {
01054 delete(req);
01055 return true;
01056 }
01057 requests.add(req->clone());
01058 pendingRequests.add(req);
01059 return true;
01060 }
01061 }
01062
01063
01064
01065
01066
01067
01068 bool MediaConnection::hasRequestBeenRequested(PreFetchRequest* req) {
01069 if (pendingRequests.contains(req))
01070 return true;
01071 else if (successRequests.contains(req))
01072 return true;
01073 else if (failedRequests.contains(req))
01074 return true;
01075 else
01076 return false;
01077 }
01078
01079 bool MediaConnection::isRequestPending(PreFetchRequest* req) {
01080 return (pendingRequests.contains(req));
01081 }
01082
01083 bool MediaConnection::hasRequestBeenExecuted(PreFetchRequest* req) {
01084 if (successRequests.contains(req))
01085 return true;
01086 else if (failedRequests.contains(req))
01087 return true;
01088 else
01089 return false;
01090 }
01091
01092 bool MediaConnection::didRequestSucceed(PreFetchRequest* req) {
01093 return (successRequests.contains(req));
01094 }
01095
01096 bool MediaConnection::didRequestFail(PreFetchRequest* req) {
01097 return (successRequests.contains(req));
01098 }
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110 bool MediaConnection::startContinuousBackgroundReceive() {
01111
01112 if (mediaServer != NULL)
01113 return true;
01114
01115 if (isInContinuousReceive())
01116 return true;
01117
01118 if (continuousCon != NULL) {
01119 continuousCon->reset();
01120 delete(continuousCon);
01121 }
01122
01123 continuousCon = new NetworkConnection(server, new NetMessageProtocol(), this);
01124 if (!continuousCon->initializeAsReceiver(this)) {
01125 delete(continuousCon);
01126 continuousCon = NULL;
01127 return false;
01128 }
01129 if (!continuousCon->isConnected()) {
01130 continuousCon->reset();
01131 delete(continuousCon);
01132 continuousCon = NULL;
01133 return false;
01134 }
01135 return true;
01136
01137
01138
01139
01140
01141
01142
01143
01144
01145
01146 }
01147
01148 bool MediaConnection::stopContinuousBackgroundReceive() {
01149
01150 if (mediaServer != NULL)
01151 return true;
01152
01153 if (!isInContinuousReceive())
01154 return true;
01155
01156 continuousCon->reset();
01157 delete(continuousCon);
01158 continuousCon = NULL;
01159 return true;
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169
01170 }
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188 ObjectCollection* MediaConnection::askRemoteServerForDataCollection(const JString& question, Object* obj, int timeout) {
01189 Message* msg = new Message(myName, server.name, question, obj);
01190 Message* rmsg = askRemoteServer(msg, timeout);
01191
01192 if (rmsg == NULL)
01193 return NULL;
01194
01195 ObjectCollection* answer = (ObjectCollection*) rmsg->takeObject();
01196
01197 if (answer != NULL) {
01198
01199 if ( (answer->getCount() < 0) || (answer->getCount() > 1000000) ) {
01200 delete(answer);
01201 answer = NULL;
01202 }
01203
01204
01205
01206
01207
01208 }
01209
01210 delete(rmsg);
01211 return answer;
01212 }
01213
01214 DataSample* MediaConnection::askRemoteServerForData(const JString& question, Object* obj, int timeout) {
01215 Message* msg = new Message(myName, server.name, question, obj);
01216 Message* rmsg = askRemoteServer(msg, timeout);
01217
01218 if (rmsg == NULL)
01219 return NULL;
01220
01221 DataSample* answer = (DataSample*) rmsg->takeObject();
01222
01223 if (answer != NULL) {
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234 }
01235
01236 delete(rmsg);
01237 return answer;
01238 }
01239
01240 bool MediaConnection::sendSampleToRemoteServer(const JString& question, DataSample* sample) {
01241 Message* msg = new Message(myName, server.name, question, sample);
01242 msg->noreply = "Yes";
01243 Message* rmsg = askRemoteServer(msg);
01244
01245 if (rmsg == NULL)
01246 return false;
01247
01248 JString type = rmsg->getType();
01249 delete(rmsg);
01250 return((type.equalsIgnoreCase("DATA_ACCEPTED")) || (type.equalsIgnoreCase("RECEIVE_ACCEPT")));
01251 }
01252
01253 bool MediaConnection::sendSamplesToRemoteServer(const JString& question, ObjectCollection* samples) {
01254 Message* msg = new Message(myName, server.name, question, samples);
01255 msg->noreply = "Yes";
01256 Message* rmsg = askRemoteServer(msg);
01257
01258 if (rmsg == NULL)
01259 return false;
01260
01261 JString type = rmsg->getType();
01262 delete(rmsg);
01263 return((type.equalsIgnoreCase("DATA_ACCEPTED")) || (type.equalsIgnoreCase("RECEIVE_ACCEPT")));
01264 }
01265
01266
01267
01268
01269 Object* MediaConnection::askRemoteServerForObject(const JString& question, int timeout) {
01270 Message* msg = new Message(myName, server.name, question);
01271 Message* rmsg = askRemoteServer(msg, timeout);
01272
01273 if (rmsg == NULL)
01274 return NULL;
01275
01276 Object* answer = rmsg->takeObject();
01277
01278 delete(rmsg);
01279 return answer;
01280 }
01281
01282 JString MediaConnection::askRemoteServerForString(const JString& question, int timeout) {
01283
01284 Message* msg = new Message(myName, server.name, question);
01285 Message* rmsg = askRemoteServer(msg, timeout);
01286
01287 if (rmsg == NULL)
01288 return "";
01289
01290 JString answer = rmsg->getContent();
01291
01292 delete(rmsg);
01293 return answer;
01294 }
01295
01296 JString MediaConnection::askRemoteServerForType(const JString& question, int timeout) {
01297
01298 Message* msg = new Message(myName, server.name, question);
01299 Message* rmsg = askRemoteServer(msg, timeout);
01300
01301 if (rmsg == NULL)
01302 return "";
01303
01304 JString answer = rmsg->type;
01305
01306 delete(rmsg);
01307 return answer;
01308 }
01309
01310 Message* MediaConnection::askRemoteServer(Message* msg, int timeout) {
01311
01312
01313
01314
01315
01316 if (msg == NULL) return NULL;
01317
01318 if (timeout < 0)
01319 timeout = MSGTIMEOUT;
01320 else
01321 timeout += MSGTIMEOUT;
01322
01323
01324 Message* rmsg = NULL;
01325
01326
01327 if (connectionAccess.EnterMutex(timeout)) {
01328 if (serverCon == NULL) {
01329 serverCon = new NetworkConnection(server, new NetMessageProtocol(), NULL);
01330 serverCon->allowMessageDropping(allowMessageDrop);
01331 }
01332 if (!serverCon->isConnected()) {
01333 serverCon->reset();
01334 serverCon->restart(server, new NetMessageProtocol(), NULL);
01335 serverCon->allowMessageDropping(allowMessageDrop);
01336 }
01337
01338 if (msg->noreply.length() > 0) {
01339
01340 if (!serverCon->sendObjectWhenReady(msg))
01341 delete(msg);
01342 rmsg = new Message("", "", "RECEIVE_ACCEPT");
01343 }
01344 else {
01345 rmsg = serverCon->sendReceiveObject(msg, timeout);
01346 if (rmsg == NULL) {
01347
01348 delete(msg);
01349 }
01350 else
01351 delete(msg);
01352 }
01353
01354 connectionAccess.LeaveMutex();
01355 }
01356 else {
01357
01358
01359 }
01360
01361
01362
01363
01364
01365
01366
01367 return rmsg;
01368 }
01369
01370
01371 ConnectionProfile MediaConnection::getServerConnectionProfile() {
01372
01373 if (mediaServer != NULL) {
01374 return mediaServer->getConnectionProfile(myName);
01375 }
01376
01377 else {
01378 ConnectionProfile* prof = (ConnectionProfile*) askRemoteServerForObject("GET_CONNECTION_PROFILE", MSGTIMEOUT);
01379 ConnectionProfile profile;
01380 if (prof == NULL)
01381 return profile;
01382 profile = *prof;
01383 delete(prof);
01384 return profile;
01385 }
01386 }
01387
01388
01389 ConnectionProfile MediaConnection::getConnectionProfile() {
01390 ConnectionProfile profile;
01391 if (isInSameExecutable(server)) {
01392 profile.isDirectlyConnected = true;
01393 }
01394 else if (serverCon != NULL) {
01395 return serverCon->localProfile;
01396 }
01397 return profile;
01398 }
01399
01400
01401 long MediaConnection::pingServer() {
01402
01403
01404
01405
01406 if (isInSameExecutable(server))
01407 return 0;
01408
01409 Message* msg = new Message(myName, server.name, "PING");
01410 JTime start;
01411 Message* rmsg = askRemoteServer(msg);
01412 if (rmsg == NULL) {
01413
01414 return -1;
01415 }
01416 JTime end;
01417 long res = -1;
01418 if (rmsg->getType().equalsIgnoreCase("PING_SUCCESS"))
01419 res = end.microDifference(start);
01420
01421
01422 delete(rmsg);
01423 return res;
01424 }
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473 bool MediaConnection::isInContinuousReceive() {
01474 if (mediaServer != NULL)
01475 return true;
01476 return ((continuousCon != NULL) && (continuousCon->isConnected()));
01477 }
01478
01479
01480
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510
01511
01512
01513
01514
01515
01516
01517
01518
01519
01520
01521
01522
01523
01524
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549
01550
01551
01552
01553
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580
01581
01582 bool MediaConnection::createChannel(const JString& name, const JString& fieldname) {
01583 if (mediaServer != NULL) {
01584 return mediaServer->createChannel(name, fieldname);
01585 }
01586 else {
01587 Message* msg = new Message(myName, server.name, "CREATE_CHANNEL",
01588 JString::format("%s::%s", (char*) name, (char*) fieldname), "");
01589 Message* rmsg = askRemoteServer(msg);
01590 if ((rmsg == NULL) || (!rmsg->type.equalsIgnoreCase("CREATE_CHANNEL_SUCCESS"))) {
01591 delete(rmsg);
01592 return false;
01593 }
01594 delete(rmsg);
01595 return true;
01596 }
01597 }
01598
01599 bool MediaConnection::destroyChannel(const JString& name) {
01600 if (mediaServer != NULL) {
01601 return mediaServer->destroyChannel(name);
01602 }
01603 else {
01604 Message* msg = new Message(myName, server.name, "DESTROY_CHANNEL", name, "");
01605 Message* rmsg = askRemoteServer(msg);
01606 if ((rmsg == NULL) || (!rmsg->type.equalsIgnoreCase("CREATE_CHANNEL_SUCCESS"))) {
01607 delete(rmsg);
01608 return false;
01609 }
01610 delete(rmsg);
01611 return true;
01612 }
01613 }
01614
01615 ObjectCollection* MediaConnection::searchChannel(const JString& name, double val1, double val2) {
01616 if (mediaServer != NULL) {
01617 ObjectCollection* col = mediaServer->searchChannel(name, val1, val2);
01618 if (col != NULL) col->noDelete();
01619 return col;
01620 }
01621 else {
01622 ObjectCollection* col = new ObjectCollection();
01623 col->add(new JString(name));
01624 col->add(new JString(val1));
01625 col->add(new JString(val2));
01626 return askRemoteServerForDataCollection("GET_SAMPLES_BY_CHANNEL", col);
01627 }
01628 }
01629
01630 bool MediaConnection::subscribeChannel(const JString& channel, double val1, double val2) {
01631 JString subscription = JString::format("%s::%f::%f", (char*) channel, val1, val2);
01632 if (mediaServer != NULL) {
01633 subscriptions.add(subscription);
01634 if (!mediaServer->directSubscribers.contains(myName))
01635 mediaServer->directSubscribers.put(myName, this);
01636 return mediaServer->subscribeChannel(myName, channel, val1, val2);
01637 }
01638 else {
01639 Message* msg = new Message(myName, server.name, "SUBSCRIBE_CHANNEL", subscription, "");
01640 Message* rmsg = askRemoteServer(msg);
01641 if ((rmsg == NULL) || (!rmsg->type.equalsIgnoreCase("SUBSCRIBE_CHANNEL_SUCCESS"))) {
01642 delete(rmsg);
01643 return false;
01644 }
01645 delete(rmsg);
01646 subscriptions.add(subscription);
01647 if (eventQueue == NULL)
01648 eventQueue = new ObjectQueue();
01649 return startContinuousBackgroundReceive();
01650 }
01651 }
01652
01653 bool MediaConnection::unsubscribeChannel(const JString& channel, double val1, double val2) {
01654 JString subscription = JString::format("%s::%f::%f", (char*) channel, val1, val2);
01655 if (mediaServer != NULL) {
01656 subscriptions.remove(subscription);
01657 if (subscriptions.getCount() == 0)
01658 mediaServer->directSubscribers.remove(myName);
01659 return mediaServer->unsubscribeChannel(myName, channel, val1, val2);
01660 }
01661 else {
01662 Message* msg = new Message(myName, server.name, "UNSUBSCRIBE_CHANNEL", subscription, "");
01663 Message* rmsg = askRemoteServer(msg);
01664 if ((rmsg == NULL) || (!rmsg->type.equalsIgnoreCase("UNSUBSCRIBE_CHANNEL_SUCCESS"))) {
01665 delete(rmsg);
01666 return false;
01667 }
01668 delete(rmsg);
01669 subscriptions.remove(subscription);
01670 if (subscriptions.getCount() == 0)
01671 stopContinuousBackgroundReceive();
01672 if (eventQueue != NULL)
01673 eventQueue->removeAll();
01674 return true;
01675 }
01676 }
01677
01678 bool MediaConnection::unsubscribeAllChannels() {
01679 if (mediaServer != NULL) {
01680 subscriptions.removeAll();
01681 mediaServer->directSubscribers.remove(myName);
01682 return mediaServer->unsubscribeAllChannels(myName);
01683 }
01684 else {
01685 Message* msg = new Message(myName, server.name, "UNSUBSCRIBE_ALL_CHANNELS");
01686 Message* rmsg = askRemoteServer(msg);
01687 if ((rmsg == NULL) || (!rmsg->type.equalsIgnoreCase("UNSUBSCRIBE_ALL_CHANNELS_SUCCESS"))) {
01688 delete(rmsg);
01689 return false;
01690 }
01691 delete(rmsg);
01692 subscriptions.removeAll();
01693 stopContinuousBackgroundReceive();
01694 if (eventQueue != NULL)
01695 eventQueue->removeAll();
01696 return true;
01697 }
01698 }
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713
01714
01715
01716
01717
01718
01719 }