00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "MediaServer.h"
00022 #include "MediaConnection.h"
00023
00024 namespace cmlabs {
00025
00026 MediaServer::MediaServer(const JString& streamname, long hardMaxSize, long softMaxSize, long maxCount, long maxBackupCount) : JThread() {
00027
00028
00029
00030 directSubscribers.noDelete();
00031 lastContinuousSend.setInvalid();
00032 continuousCons.noDelete();
00033 shouldContinue = true;
00034 name = streamname;
00035 if (name.length() == 0)
00036 name = createUniqueID("stream");
00037
00038 mediaStream = new MediaStream(name, softMaxSize, hardMaxSize, maxCount, maxBackupCount);
00039
00040 network = new Network(this);
00041 network->name = JString::format("MediaServer_%s_Net", (char*) name);
00042 network->addProtocol(new NetMessageProtocol());
00043
00044 network->addProtocol(new NetHTTPProtocol());
00045 network->addProtocol(new NetTelnetProtocol(JString("Welcome to MediaServer"), "MediaServer > "));
00046 network->setPreferredNetProtocol(new NetMessageProtocol());
00047 }
00048
00049 MediaServer::~MediaServer() {
00050 if (shouldContinue) {
00051 shouldContinue = false;
00052 wait(100);
00053 }
00054
00055 delete(network);
00056 delete(mediaStream);
00057 }
00058
00059 Object* MediaServer::clone() const {
00060 return NULL;
00061 }
00062
00063 bool MediaServer::init(int port) {
00064 if (network->init(port))
00065 network->start();
00066 else
00067 return false;
00068 return true;
00069 }
00070
00071 JString MediaServer::getName() {
00072 return name;
00073 }
00074
00075 int MediaServer::getConnectionCount() {
00076
00077 return serverProfiles.getCount();
00078 }
00079
00080 ConnectionProfile MediaServer::getConnectionProfile(const JString& client) {
00081
00082 ConnectionProfile* profile = (ConnectionProfile*) serverProfiles.get(client);
00083
00084
00085 if (profile == NULL)
00086 return ConnectionProfile();
00087 return *profile;
00088 }
00089
00090 bool MediaServer::handleMultipleWriters(bool allow) {
00091 return mediaStream->handleMultipleWriters(allow);
00092 }
00093
00094 long MediaServer::getTotalDataSize() {
00095 return mediaStream->getTotalDataSize();
00096 }
00097
00098 long MediaServer::getTotalDataMemUse() {
00099 return mediaStream->getTotalDataMemUse();
00100 }
00101
00102 int MediaServer::getCount() {
00103 return mediaStream->getCount();
00104 }
00105
00106 double MediaServer::getTotalDataEverHandled() {
00107 return mediaStream->getTotalDataEverHandled();
00108 }
00109
00110 double MediaServer::getInputDataRate() {
00111 return mediaStream->getInputDataRate();
00112 }
00113
00114 double MediaServer::getOutputDataRate() {
00115 return mediaStream->getOutputDataRate();
00116 }
00117
00118 InfoItem* MediaServer::getInfo() {
00119 InfoItem* info = mediaStream->getInfo();
00120 info->setEntry("CurrentConnections", JString(getConnectionCount()));
00121 return info;
00122 }
00123
00124 InfoItem* MediaServer::getInfoAllStats() {
00125 ObjectDictionary* profiles = (ObjectDictionary*) serverProfiles.clone();
00126 InfoItem* info = getInfo();
00127 info->setObject(profiles);
00128 return info;
00129 }
00130
00131
00132 Message* MediaServer::netObjectReceive(Message *inMsg, NetworkConnection *con) {
00133
00134
00135
00136 Message* outMsg = NULL;
00137 JString from = inMsg->getFrom();
00138 JString type = inMsg->getType();
00139 JString content = inMsg->getContent();
00140
00141
00142
00143 JTime now;
00144 Collection names;
00145 bool includeInStats = false;
00146
00147 if ((type.equals("HTTP")) && (inMsg->getObject() != NULL)) {
00148 HTMLPage* page = serveHTTPRequest(inMsg->getOrigin(), (HTTPRequest*) inMsg->getObject());
00149 outMsg = new Message("MediaServer", from, type);
00150 outMsg->setObject(page);
00151 }
00152 else if ((type.equals("HTTPForward")) && (inMsg->getObject() != NULL)) {
00153 HTMLPage* page = serveHTTPRequest(inMsg->get("httpOrigin"), (HTTPRequest*) inMsg->getObject());
00154 outMsg = new Message("MediaServer", from, type);
00155 outMsg->setObject(page);
00156 }
00157 else if (type.equals("TelnetInput")) {
00158 outMsg = serveTelnetInput(from, inMsg->getOrigin(), content);
00159 }
00160 else if (type.equalsIgnoreCase("PING")) {
00161 outMsg = new Message("MediaServer", from, "PING_SUCCESS");
00162 }
00163 else {
00164 JTime t, t2;
00165 DataSample* sample;
00166 ObjectCollection* samples;
00167
00168 outMsg = new Message(name, inMsg->getFrom(), "RECEIVE_ACCEPT");
00169 if (type.equalsIgnoreCase("GET_COUNT")) {
00170 outMsg->setContent(JString(this->getCount()), "");
00171 }
00172 else if (type.equalsIgnoreCase("GET_SIZE")) {
00173
00174 outMsg->setContent(JString(this->getTotalDataSize()), "");
00175 }
00176 else if (type.equalsIgnoreCase("GET_NEWEST_TIMESTAMP")) {
00177 t = this->getNewestTimestamp();
00178 if (t.isValid())
00179 outMsg->setObject(t.clone());
00180 }
00181 else if (type.equalsIgnoreCase("GET_OLDEST_TIMESTAMP")) {
00182 t = this->getOldestTimestamp();
00183 if (t.isValid())
00184 outMsg->setObject(t.clone());
00185 }
00186 else if (type.equalsIgnoreCase("GET_NAME")) {
00187 outMsg->setContent(this->getName(), "");
00188 }
00189 else if (type.equalsIgnoreCase("GET_SAMPLE_LIST")) {
00190 includeInStats = true;
00191 ObjectCollection* col = this->getDataSampleList();
00192 if (col != NULL)
00193 outMsg->setObject(col);
00194 }
00195 else if (type.equalsIgnoreCase("GET_NEWEST_DATASAMPLE")) {
00196 includeInStats = true;
00197 sample = this->getNewestSample(from);
00198 ObjectCollection* timestamps = (ObjectCollection*)inMsg->getObject();
00199 if (sample != NULL) {
00200 t = sample->getTimestamp();
00201 if ((timestamps != NULL) && (timestamps->contains(&t))) {
00202
00203 }
00204 else {
00205 outMsg->setObject(sample->clone());
00206 }
00207 }
00208 }
00209 else if (type.equalsIgnoreCase("GET_OLDEST_DATASAMPLE")) {
00210 includeInStats = true;
00211 sample = this->getOldestSample(from);
00212 ObjectCollection* timestamps = (ObjectCollection*)inMsg->getObject();
00213 if (sample != NULL) {
00214 t = sample->getTimestamp();
00215 if ((timestamps != NULL) && (timestamps->contains(&t))) {
00216
00217 }
00218 else {
00219 outMsg->setObject(sample->clone());
00220 }
00221 }
00222 }
00223 else if (type.equalsIgnoreCase("GET_SAMPLE_BY_TIME")) {
00224 includeInStats = true;
00225
00226 JTime* tme = (JTime*)inMsg->getObject();
00227 if ((tme != NULL) && (tme->isValid())) {
00228 sample = this->getDataSample(from, *tme);
00229 if (sample != NULL) {
00230
00231 outMsg->setObject(sample->clone());
00232 if (DEBUGLEVEL(DEBUG))
00233 printf("MediaServer found sample at time: %s...\n%s\n", (char*) tme->printTimeMS(), (char*)this->mediaStream->print());
00234 }
00235 else
00236 if (DEBUGLEVEL(DEBUG))
00237 printf("MediaServer didn't find sample at time: %s...\n%s\n", (char*) tme->printTimeMS(), (char*)this->mediaStream->print());
00238 }
00239 else if (tme != NULL) {
00240 if (DEBUGLEVEL(DEBUG))
00241 printf("MediaServer didn't find sample at invalid time: %s...\n%s\n", (char*) tme->printTimeMS(), (char*)this->mediaStream->print());
00242 }
00243 else {
00244 if (DEBUGLEVEL(DEBUG))
00245 printf("MediaServer didn't find sample at invalid time: NULL...\n%s\n", (char*)this->mediaStream->print());
00246 }
00247 }
00248 else if (type.equalsIgnoreCase("GET_SAMPLE_BY_ID")) {
00249 includeInStats = true;
00250 JString* ss = (JString*)inMsg->getObject();
00251 if ((ss != NULL) && (ss->length() > 0)) {
00252 sample = this->getDataSample(from, *ss);
00253 if (sample != NULL) {
00254
00255 outMsg->setObject(sample->clone());
00256 }
00257 }
00258 }
00259 else if (type.equalsIgnoreCase("GET_SAMPLES_BY_TIME")) {
00260 includeInStats = true;
00261 ObjectCollection* times = (ObjectCollection*) inMsg->getObject();
00262 if ((times != NULL) && (times->getCount() == 2)) {
00263 t = *(JTime*) times->get(0);
00264 t2 = *(JTime*) times->get(1);
00265 samples = this->getDataSamples(from, t, t2);
00266 if (samples != NULL) {
00267 samples->noDelete();
00268 ObjectCollection* samplesCopy = (ObjectCollection*) samples->clone();
00269 delete(samples);
00270
00271
00272
00273
00274
00275
00276 outMsg->setObject(samplesCopy);
00277 }
00278 }
00279 }
00280 else if (type.equalsIgnoreCase("CREATE_CHANNEL")) {
00281 names = inMsg->content.split("::");
00282 if (createChannel(names.get(0), names.get(1)))
00283 outMsg->setType("CREATE_CHANNEL_SUCCESS");
00284 else
00285 outMsg->setType("CREATE_CHANNEL_FAILED");
00286 }
00287 else if (type.equalsIgnoreCase("DESTROY_CHANNEL")) {
00288 if (destroyChannel(inMsg->content))
00289 outMsg->setType("DESTROY_CHANNEL_SUCCESS");
00290 else
00291 outMsg->setType("DESTROY_CHANNEL_FAILED");
00292 }
00293 else if (type.equalsIgnoreCase("GET_SAMPLES_BY_CHANNEL")) {
00294 includeInStats = true;
00295 ObjectCollection* vals = (ObjectCollection*) inMsg->getObject();
00296 if ((vals != NULL) && (vals->getCount() == 3)) {
00297 JString channel = *(JString*)vals->get(0);
00298 double val1 = ((JString*)vals->get(1))->toDouble();
00299 double val2 = ((JString*)vals->get(2))->toDouble();
00300 samples = this->searchChannel(channel, val1, val2);
00301 if (samples != NULL) {
00302 samples->noDelete();
00303 ObjectCollection* samplesCopy = (ObjectCollection*) samples->clone();
00304 delete(samples);
00305
00306
00307
00308
00309
00310
00311 outMsg->setObject(samplesCopy);
00312 }
00313 }
00314 }
00315 else if (type.equalsIgnoreCase("SUBSCRIBE_CHANNEL")) {
00316 names = inMsg->content.split("::");
00317 if (subscribeChannel(from, names.get(0), names.get(1).toDouble(), names.get(2).toDouble()))
00318 outMsg->setType("SUBSCRIBE_CHANNEL_SUCCESS");
00319 else
00320 outMsg->setType("SUBSCRIBE_CHANNEL_FAILED");
00321 }
00322 else if (type.equalsIgnoreCase("UNSUBSCRIBE_CHANNEL")) {
00323 names = inMsg->content.split("::");
00324 if (unsubscribeChannel(from, names.get(0), names.get(1).toDouble(), names.get(2).toDouble()))
00325 outMsg->setType("UNSUBSCRIBE_CHANNEL_SUCCESS");
00326 else
00327 outMsg->setType("UNSUBSCRIBE_CHANNEL_FAILED");
00328 }
00329 else if (type.equalsIgnoreCase("UNSUBSCRIBE_ALL_CHANNELS")) {
00330 if (unsubscribeAllChannels(from))
00331 outMsg->setType("UNSUBSCRIBE_ALL_CHANNELS_SUCCESS");
00332 else
00333 outMsg->setType("UNSUBSCRIBE_ALL_CHANNELS_FAILED");
00334 }
00335 else if (type.equalsIgnoreCase("ADD_SAMPLE")) {
00336 includeInStats = true;
00337 outMsg->setType("DATA_NOT_ACCEPTED");
00338 sample = (DataSample*)inMsg->takeObject();
00339 if (sample != NULL) {
00340 if (this->addDataSample(from, sample))
00341 outMsg->setType("DATA_ACCEPTED");
00342
00343 fflush(stdout);
00344 }
00345 if (inMsg->noreply.length() > 0) {
00346 delete(outMsg);
00347 outMsg = NULL;
00348 }
00349 }
00350 else if (type.equalsIgnoreCase("ADD_SAMPLES")) {
00351 includeInStats = true;
00352 outMsg->setType("DATA_NOT_ACCEPTED");
00353 samples = (ObjectCollection*)inMsg->takeObject();
00354 if (samples != NULL) {
00355 if (this->addDataSamples(from, samples))
00356 outMsg->setType("DATA_ACCEPTED");
00357 delete(samples);
00358 }
00359 if (inMsg->noreply.length() > 0) {
00360 delete(outMsg);
00361 outMsg = NULL;
00362 }
00363 }
00364 else if (type.startsWithIgnoreCase("WAIT_FOR_FIRST_SAMPLE")) {
00365 includeInStats = true;
00366 int ms = type.substring(22).toInt();
00367 JTime* tme = (JTime*)inMsg->getObject();
00368 if ((tme != NULL) && (tme->isValid()) && (ms >= 0)) {
00369 sample = this->waitForFirstSampleAfter(from, *tme, ms);
00370 if (sample != NULL)
00371 outMsg->setObject(sample->clone());
00372
00373
00374
00375
00376
00377 }
00378 }
00379 else if (type.startsWithIgnoreCase("WAIT_FOR_FIRST_AFTERID")) {
00380 includeInStats = true;
00381 int ms = type.substring(23).toInt();
00382 JString* ss2 = (JString*)inMsg->getObject();
00383 if (ss2 == NULL)
00384 ss2 = &inMsg->content;
00385 if ((ss2 != NULL) && (ss2->length() > 0) && (ms >= 0)) {
00386 sample = this->waitForFirstSampleAfter(from, *ss2, ms);
00387 if (sample != NULL) {
00388
00389 outMsg->setObject(sample->clone());
00390 }
00391
00392
00393
00394
00395
00396 }
00397 }
00398 else if (type.startsWithIgnoreCase("WAIT_FOR_LAST_AFTERID")) {
00399 includeInStats = true;
00400 now = JTime();
00401 int ms = type.substring(22).toInt();
00402 JString* ss3 = (JString*)inMsg->getObject();
00403 if (ss3 == NULL)
00404 ss3 = &inMsg->content;
00405 if ((ss3 != NULL) && (ss3->length() > 0) && (ms >= 0)) {
00406 sample = this->waitForLastSampleAfter(from, *ss3, ms);
00407 if (sample != NULL) {
00408
00409 outMsg->setObject(sample->clone());
00410 }
00411 }
00412 }
00413 else if (type.startsWithIgnoreCase("WAIT_FOR_LAST_SAMPLE")) {
00414 includeInStats = true;
00415 now = JTime();
00416 int ms = type.substring(21).toInt();
00417 JTime* tme = (JTime*)inMsg->getObject();
00418 if ((tme != NULL) && (tme->isValid()) && (ms >= 0)) {
00419 sample = this->waitForLastSampleAfter(from, *tme, ms);
00420 if (sample != NULL) {
00421
00422 outMsg->setObject(sample->clone());
00423 }
00424 }
00425 }
00426 else if (type.equalsIgnoreCase("START_CONTINUOUS_RECEIVE")) {
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436 outMsg->setType("CONTINUOUS_RECEIVE_FAILED");
00437 }
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468 else if (type.equalsIgnoreCase("GET_CONNECTION_PROFILE")) {
00469 ConnectionProfile profile = getConnectionProfile(from);
00470 outMsg->setObject(profile.clone());
00471 outMsg->setType("GET_CONNECTION_PROFILE_SUCCESS");
00472 }
00473 else if (type.equalsIgnoreCase("GET_INFO")) {
00474 InfoItem* info = getInfo();
00475 outMsg->setObject(info);
00476 outMsg->setType("GET_INFO_SUCCESS");
00477 }
00478 else if (type.equalsIgnoreCase("GET_INFO_ALL_STATS")) {
00479 InfoItem* info = getInfoAllStats();
00480 outMsg->setObject(info);
00481 outMsg->setType("GET_ALL_STATS_SUCCESS");
00482 }
00483 else if (type.equalsIgnoreCase("HANDLE_MULTIPLE_WRITERS")) {
00484 if (handleMultipleWriters(true)) {
00485 outMsg->setType("HANDLE_MULTIPLE_WRITERS_SUCCESS");
00486 }
00487 else {
00488 outMsg->setType("HANDLE_MULTIPLE_WRITERS_FAILED");
00489 }
00490 }
00491 else if (type.equalsIgnoreCase("HANDLE_SINGLE_WRITER")) {
00492 if (handleMultipleWriters(false))
00493 outMsg->setType("HANDLE_SINGLE_WRITER_SUCCESS");
00494 else
00495 outMsg->setType("HANDLE_SINGLE_WRITER_FAILED");
00496 }
00497
00498 }
00499
00500
00501
00502
00503
00504 if (includeInStats)
00505 serverNetworkProfiles.put(from, con->localProfile.clone());
00506 delete(inMsg);
00507 return outMsg;
00508 }
00509
00510
00511
00512 bool MediaServer::addDataStat(const JString& name, long upbytes, long downbytes, const JTime& time) {
00513 ConnectionProfile* profile = (ConnectionProfile*) serverProfiles.get(name);
00514 if (profile == NULL) {
00515 profile = new ConnectionProfile(name);
00516 serverProfiles.put(name, profile);
00517 }
00518 return profile->addDataStat(upbytes, downbytes, time);
00519 }
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530 TCPLocation MediaServer::getLocation() {
00531 TCPLocation loc = TCPLocation(network->getLocalIPAddress(), network->getPort(), name);
00532 loc.procid = getProcID();
00533 loc.pCommunicator = (void*) this;
00534 return loc;
00535 }
00536
00537
00538 void MediaServer::run() {
00539
00540 int n;
00541 DataSample* sample = NULL;
00542 JTime lastAct;
00543 NetworkConnection* con;
00544 Message* msg;
00545 JTime now;
00546 int count;
00547 ObjectCollection* subs;
00548 MediaConnection* mcon;
00549 JString remotename;
00550
00551 while (shouldContinue) {
00552
00553 if ((network->remoteReceiverCons->getCount() > 0) || (directSubscribers.getCount() > 0)) {
00554 if (!lastContinuousSend.isValid())
00555 lastContinuousSend = JTime();
00556 sample = waitForFirstSampleAfter(lastContinuousSend, 50);
00557 if (sample != NULL) {
00558
00559 lastContinuousSend = sample->getTimestamp();
00560 count = 0;
00561 for (n=0; n<directSubscribers.getCount(); n++) {
00562 remotename = directSubscribers.getKey(n);
00563 subs = (ObjectCollection*) subscriptions.get(remotename);
00564 mcon = (MediaConnection*) directSubscribers.get(n);
00565 if ((subs != NULL) && (mcon != NULL)) {
00566 if (subscriptionMatch(subs, sample)) {
00567 mcon->addSubscriptionEvent(sample);
00568 addDataStat(remotename, 0, sample->size);
00569 count++;
00570 }
00571 }
00572 }
00573 for (n=0; n<network->remoteReceiverCons->getCount(); n++) {
00574 con = (NetworkConnection*) network->remoteReceiverCons->get(n);
00575 if (con != NULL) {
00576
00577 if (con->isConnected()) {
00578
00579 subs = (ObjectCollection*) subscriptions.get(con->remoteName);
00580 if (subs != NULL) {
00581 if (!subscriptionMatch(subs, sample)) {
00582
00583 continue;
00584 }
00585 sample->params.put("SubscriptionEventMatch", "Matched");
00586
00587 }
00588
00589 msg = new Message(this->getName(), con->remoteName, "BinaryOnly", sample->clone());
00590 msg->noreply = "Yes";
00591 addDataStat(con->remoteName, 0, sample->size);
00592 if (con->sendObjectWhenReady(msg)) {
00593 count++;
00594 }
00595 if (DEBUGLEVEL(DETAILEDSTATUS))
00596 printf("Continuous %s ---> %s\n", (char*) con->remoteName, (char*) sample->print());
00597 }
00598 else {
00599 if (DEBUGLEVEL(STATUS))
00600 printf("Continuous STOP %s ---> %s\n", (char*) con->remoteName, (char*) sample->print());
00601 con->reset();
00602 network->remoteReceiverCons->removeNoDelete(n);
00603 n--;
00604 if (network->remoteReceiverCons->getCount() == 0)
00605 lastContinuousSend.setInvalid();
00606 }
00607 }
00608 }
00609 if (DEBUGLEVEL(DEBUG)) {
00610 now = JTime();
00611 printf("[%s] MediaServer continuous sent sample to %d cons at %s...\n", (char*) now.printTimeMS(), count, (char*) lastContinuousSend.printTime());
00612 }
00613 }
00614 else {
00615
00616 }
00617 }
00618 else {
00619 lastContinuousSend.setInvalid();
00620 wait(20);
00621 }
00622 }
00623 }
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636 JTime MediaServer::getOldestTimestamp() {
00637 return mediaStream->getOldestSampleTime();
00638 }
00639
00640 JTime MediaServer::getNewestTimestamp() {
00641 return mediaStream->getNewestSampleTime();
00642 }
00643
00644 ObjectCollection* MediaServer::getDataSampleList() {
00645 return mediaStream->getDataSampleList();
00646 }
00647
00648
00649
00650
00651
00652
00653 DataSample* MediaServer::getOldestSample(const JString& clientname) {
00654 DataSample* sample = mediaStream->getOldestSample();
00655 if (sample != NULL)
00656 addDataStat(clientname, 0, sample->getDataSize());
00657 return sample;
00658 }
00659
00660 DataSample* MediaServer::getNewestSample(const JString& clientname) {
00661 DataSample* sample = mediaStream->getNewestSample();
00662 if (sample != NULL)
00663 addDataStat(clientname, 0, sample->getDataSize());
00664 return sample;
00665 }
00666
00667
00668 DataSample* MediaServer::getDataSample(const JString& clientname, const JTime& time) {
00669 DataSample* sample = getDataSample(time);
00670 if (sample != NULL)
00671 addDataStat(clientname, 0, sample->getDataSize());
00672 return sample;
00673 }
00674 DataSample* MediaServer::getDataSample(const JTime& time) {
00675 return mediaStream->getSampleAt(time);
00676 }
00677
00678
00679 ObjectCollection* MediaServer::getDataSamples(const JString& clientname, const JTime& t1, const JTime& t2) {
00680 ObjectCollection* col = getDataSamples(t1, t2);
00681 long bytes = 0;
00682 DataSample* sample;
00683 if (col != NULL) {
00684 for (int n=0; n<col->getCount(); n++)
00685 if ( (sample = (DataSample*) col->get(n)) != NULL)
00686 bytes += sample->getDataSize();
00687 }
00688 addDataStat(clientname, 0, bytes);
00689 return col;
00690 }
00691 ObjectCollection* MediaServer::getDataSamples(const JTime& t1, const JTime& t2) {
00692 return mediaStream->getSamplesBetween(t1, t2);
00693 }
00694
00695 DataSample* MediaServer::waitForSampleAt(const JString& clientname, const JTime& time, long ms) {
00696 JTime start;
00697 DataSample* sample = waitForFirstSampleAfter(time-1, ms);
00698 if ((sample == NULL) || (sample->timestamp != time)) {
00699 JTime now;
00700 sample = NULL;
00701 long dif = now - start;
00702 if (dif < ms) {
00703 wait(ms-dif);
00704 sample = getDataSample(time);
00705 }
00706 }
00707 if (sample != NULL)
00708 addDataStat(clientname, 0, sample->getDataSize());
00709 return sample;
00710 }
00711 DataSample* MediaServer::waitForSampleAt(const JTime& time, long ms) {
00712 DataSample* sample = waitForFirstSampleAfter(time-1, ms);
00713 if ((sample == NULL) || (sample->timestamp != time))
00714 return NULL;
00715 return sample;
00716 }
00717
00718 DataSample* MediaServer::getSample(const JString& clientname, const JString& id) {
00719 DataSample* sample = mediaStream->getSample(id);
00720 if (sample != NULL)
00721 addDataStat(clientname, 0, sample->getDataSize());
00722 return sample;
00723 }
00724 DataSample* MediaServer::getFirstSampleAfter(const JString& clientname, const JString& id) {
00725 DataSample* sample = mediaStream->getFirstSampleAfter(id);
00726 if (sample != NULL)
00727 addDataStat(clientname, 0, sample->getDataSize());
00728 return sample;
00729 }
00730 DataSample* MediaServer::waitForFirstSampleAfter(const JString& clientname, const JString& id, long ms) {
00731 DataSample* sample = mediaStream->waitForFirstSampleAfter(id, ms);
00732 if (sample != NULL)
00733 addDataStat(clientname, 0, sample->getDataSize());
00734 return sample;
00735 }
00736 DataSample* MediaServer::waitForLastSampleAfter(const JString& clientname, const JString& id, long ms) {
00737 DataSample* sample = mediaStream->waitForLastSampleAfter(id, ms);
00738 if (sample != NULL)
00739 addDataStat(clientname, 0, sample->getDataSize());
00740 return sample;
00741 }
00742
00743 DataSample* MediaServer::getSample(const JString& id) {
00744 return mediaStream->getSample(id);
00745 }
00746 DataSample* MediaServer::getFirstSampleAfter(const JString& id) {
00747 return mediaStream->getFirstSampleAfter(id);
00748 }
00749 DataSample* MediaServer::waitForFirstSampleAfter(const JString& id, long ms) {
00750 return mediaStream->waitForFirstSampleAfter(id, ms);
00751 }
00752 DataSample* MediaServer::waitForLastSampleAfter(const JString& id, long ms) {
00753 return mediaStream->waitForLastSampleAfter(id, ms);
00754 }
00755
00756
00757 DataSample* MediaServer::waitForFirstSampleAfter(const JString& clientname, const JTime& time, long ms) {
00758 DataSample* sample = waitForFirstSampleAfter(time, ms);
00759 if (sample != NULL)
00760 addDataStat(clientname, 0, sample->getDataSize());
00761 return sample;
00762 }
00763 DataSample* MediaServer::waitForFirstSampleAfter(const JTime& time, long ms) {
00764 return mediaStream->waitForFirstSampleAfter(time, ms);
00765 }
00766
00767 DataSample* MediaServer::waitForLastSampleAfter(const JString& clientname, const JTime& time, long ms) {
00768 DataSample* sample = waitForLastSampleAfter(time, ms);
00769 if (sample != NULL)
00770 addDataStat(clientname, 0, sample->getDataSize());
00771 return sample;
00772 }
00773 DataSample* MediaServer::waitForLastSampleAfter(const JTime& time, long ms) {
00774 return mediaStream->waitForLastSampleAfter(time, ms);
00775 }
00776
00777
00778
00779
00780
00781
00782
00783 bool MediaServer::addDataSample(const JString& clientname, DataSample* sample) {
00784 if (sample != NULL)
00785 addDataStat(clientname, sample->getDataSize(), 0);
00786 return addDataSample(sample);
00787 }
00788 bool MediaServer::addDataSample(DataSample* sample) {
00789 return mediaStream->addSample(sample);
00790 }
00791
00792
00793 bool MediaServer::addDataSamples(const JString& clientname, ObjectCollection* samples) {
00794
00795 long bytes = 0;
00796 DataSample* sample;
00797 if (samples != NULL) {
00798 for (int n=0; n<samples->getCount(); n++) {
00799 if ( (sample = (DataSample*) samples->get(n)) != NULL) {
00800 bytes += sample->getDataSize();
00801 addDataStat(clientname, sample->getDataSize(), 0);
00802 }
00803 }
00804 }
00805 return addDataSamples(samples);
00806 }
00807 bool MediaServer::addDataSamples(ObjectCollection* samples) {
00808 return mediaStream->addSamples(samples);
00809 }
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825 HTMLPage* MediaServer::serveHTTPRequest(const JString& origin, HTTPRequest* request) {
00826
00827 HTMLPage* page = new HTMLPage();
00828
00829 page->title = "MediaServer Overview";
00830
00831 page->body = "<H1>Status</H1>\n";
00832
00833 return page;
00834 }
00835
00836 Message* MediaServer::serveTelnetInput(const JString& id, const JString& origin, const JString& text) {
00837
00838 JString out;
00839 Message* msg = new Message("MediaServer", "", "");
00840 JTime now;
00841 msg->set("NewPromptText", JString::format("MediaServer [%s] ", (char*) now.printTime()));
00842
00843 if (text.length() == 0)
00844 return msg;
00845
00846 Collection oc = text.split(" ");
00847
00848 JString cmd = oc.get(0);
00849 JString arg = oc.get(1);
00850
00851 if (cmd.equalsIgnoreCase("list")) {
00852 out = "";
00853 }
00854 else if (cmd.equalsIgnoreCase("status"))
00855 out = "";
00856 else
00857 out = JString("Unknown command: ") + cmd;
00858
00859 msg->setContent(out, "");
00860 return msg;
00861 }
00862
00863 bool MediaServer::unitTest() {
00864
00865 addUnitTestLog(JString::format("Starting MediaServer test..."));
00866 JTime now;
00867
00868 printf("\n Performance Tests:\n");
00869
00870 int count, k, c;
00871 MediaConnection* con1, *con2;
00872 JTime t;
00873 DataSample* sample, *sample2;
00874 JTime tt;
00875 int n;
00876 JTime start;
00877
00878 int p = 23545;
00879 while (!this->init(p)) {
00880 p++;
00881 }
00882
00883 TCPLocation loc = this->getLocation();
00884
00885 addUnitTestLog(JString::format("Creating connection 1..."));
00886 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
00887 if (!con1->initWithTraining())
00888 return false;
00889 addUnitTestLog(JString::format("Creating connection 2..."));
00890 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
00891 if (!con2->initWithTraining())
00892 return false;
00893 con1->start();
00894 con2->start();
00895
00896 addUnitTestLog(JString::format("Testing location..."));
00897 if (!con1->isInSameExecutable(loc))
00898 return false;
00899
00900
00901
00902 t.reset();
00903 addUnitTestLog(JString::format("Testing connection 1..."));
00904 t = con1->getOldestTimestamp();
00905 if (t.isValid()) return false;
00906 t = con1->getNewestTimestamp();
00907 if (t.isValid()) return false;
00908 t = con1->getOldestBufferTimestamp();
00909 if (t.isValid()) return false;
00910 t = con1->getNewestBufferTimestamp();
00911 if (t.isValid()) return false;
00912
00913
00914
00915 addUnitTestLog(JString::format("Testing connection 2..."));
00916 t = con2->getOldestTimestamp();
00917 if (t.isValid()) return false;
00918 t = con2->getNewestTimestamp();
00919 if (t.isValid()) return false;
00920 t = con2->getOldestBufferTimestamp();
00921 if (t.isValid()) return false;
00922 t = con2->getNewestBufferTimestamp();
00923 if (t.isValid()) return false;
00924
00925 t = JTime();
00926 sample = new DataSample();
00927 char* data = new char[20];
00928 strcpy(data, "Hello World");
00929 sample->setTimestamp(t);
00930 sample->giveData(data, 20, 25);
00931 con1->addDataSample(sample);
00932
00933
00934
00935 addUnitTestLog(JString::format("Testing connection 1 without hello world..."));
00936 sample2 = con2->getDataSample(t);
00937 if (sample != sample2)
00938 return false;
00939
00940
00941
00942 addUnitTestLog(JString::format("Testing connections..."));
00943 t = JTime();
00944 tt = t;
00945 for (n=0; n<5; n++) {
00946
00947
00948
00949 tt = tt + 1;
00950 sample2 = (DataSample*)sample->clone();
00951
00952 if (!con1->addDataSample(sample2))
00953 addUnitTestLog(JString::format("Couln't add: %s", (char*)tt.print()));
00954 else
00955 addUnitTestLog(JString::format("Added: %s", (char*)tt.print()));
00956 }
00957
00958 now = JTime();
00959
00960 t = tt - 10000000;
00961 tt = tt + 1;
00962 ObjectCollection* col = con2->getDataSamples(t, tt);
00963
00964 if (col == NULL) {
00965 addUnitTestLog(JString::format("Couldn't get samples..."));
00966 return false;
00967 }
00968
00969 if (col->getCount() < 6) {
00970 addUnitTestLog(JString::format("Samples count wrong: %d...", col->getCount()));
00971 return false;
00972 }
00973
00974
00975 delete(col);
00976
00977 col = con2->getDataSampleList();
00978 if (col == NULL) {
00979 addUnitTestLog(JString::format("Couldn't get samplelist..."));
00980 return false;
00981 }
00982 if (col->getCount() < 6) {
00983 addUnitTestLog(JString::format("Samplelist count wrong: %d...", col->getCount()));
00984 return false;
00985 }
00986 delete(col);
00987
00988
00989 sample2 = NULL;
00990 while ( (sample = con1->waitForFirstSampleAfter(t, 1000)) != NULL ) {
00991 if (sample == sample2) {
00992 addUnitTestLog(JString::format("Got same sample twice from stream..."));
00993 return false;
00994 }
00995 t = sample->timestamp;
00996 sample2 = sample;
00997 }
00998
00999
01000
01001 delete(con1);
01002 delete(con2);
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020 addUnitTestLog("Starting networking test...");
01021
01022
01023
01024
01025 loc.procid = 0;
01026 con1 = new MediaConnection("Test3", loc, 10*1024*1024, 12*1024*1024);
01027 if (!con1->initWithTraining()) {
01028 addUnitTestLog("Could not init network con1");
01029 return false;
01030 }
01031 con1->start();
01032 con2 = new MediaConnection("Test4", loc, 10*1024*1024, 12*1024*1024);
01033 if (!con2->initWithTraining()) {
01034 addUnitTestLog("Could not init network con2");
01035 return false;
01036 }
01037 con2->start();
01038 if (con1->isInSameExecutable(loc)) {
01039 addUnitTestLog("Network con1 in same executable, shouldn't be...");
01040 return false;
01041 }
01042 if (con2->isInSameExecutable(loc)) {
01043 addUnitTestLog("Network con2 in same executable, shouldn't be...");
01044 return false;
01045 }
01046
01047
01048 JString str = con1->getServerName();
01049 if (str.length() == 0)
01050 return false;
01051
01052
01053 if (con1->getCount() != 6)
01054 return false;
01055
01056
01057 col = con1->getDataSampleList();
01058 if (col == NULL)
01059 return false;
01060
01061 if (col->getCount() != 6)
01062 return false;
01063
01064
01065 addUnitTestLog(JString::format("Got Sample List OK [%d]:\n%s\n", col->getCount(), (char*) col->printListLine("\n")));
01066
01067 delete(col);
01068
01069
01070
01071
01072 t = tt - 10000000;
01073 col = con1->getDataSamples(t, tt);
01074 if (col == NULL)
01075 return false;
01076
01077 if (col->getCount() < 6)
01078 return false;
01079
01080
01081 sample = (DataSample*) col->get(0);
01082 if (sample == NULL)
01083 return false;
01084
01085 if (sample->getDataLink() == NULL)
01086 return false;
01087
01088 JString s = sample->getDataLink();
01089 if (!s.equals("Hello World"))
01090 return false;
01091
01092 JTime firstTime = sample->getTimestamp();
01093 if (!firstTime.isValid())
01094 return false;
01095
01096 addUnitTestLog("Got Sample Collection OK...");
01097
01098 sample2 = con1->getDataSample(firstTime);
01099 if (sample2 == NULL) {
01100 addUnitTestLog(JString::format("Sample2 NULL [%s]", (char*) firstTime.print()));
01101 return false;
01102 }
01103
01104 if (strcmp(sample->getDataLink(), sample2->getDataLink()) != 0 ) {
01105 addUnitTestLog(JString::format("Sample1 != Sample2 '%s' != '%s' ", (char*) sample->getDataLink(), (char*)sample2->getDataLink()));
01106 return false;
01107 }
01108
01109 addUnitTestLog("Got Individual Sample OK...");
01110
01111 sample = (DataSample*) col->get(1);
01112 if (sample == NULL)
01113 return false;
01114
01115 sample2 = con1->waitForFirstSampleAfter(firstTime, 1000);
01116 if (sample2 == NULL)
01117 return false;
01118
01119 addUnitTestLog("Waited for First After Sample OK...");
01120
01121 if (strcmp(sample->getDataLink(), sample2->getDataLink()) != 0 )
01122 return false;
01123
01124 addUnitTestLog("Waited for First After Sample Tested OK...");
01125
01126 sample = (DataSample*) col->getLast();
01127 sample2 = con1->waitForLastSampleAfter(firstTime, 1000);
01128 if (sample2 == NULL)
01129 return false;
01130
01131 if (strcmp(sample->getDataLink(), sample2->getDataLink()) != 0 )
01132 return false;
01133
01134 addUnitTestLog("Waited for Last After Sample OK...");
01135
01136 sample = (DataSample*) sample->clone();
01137 delete(col);
01138
01139
01140 DataSample* sample3;
01141 sample2 = NULL;
01142 while ( (sample3 = con1->waitForFirstSampleAfter(t, 1000)) != NULL ) {
01143 if (sample3 == sample2) {
01144 addUnitTestLog(JString::format("Got same sample twice from stream..."));
01145 return false;
01146 }
01147 t = sample3->timestamp;
01148 sample2 = sample3;
01149 }
01150
01151 delete(con1);
01152 delete(con2);
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163 loc = this->getLocation();
01164
01165 addUnitTestLog(JString::format("Creating connection 1..."));
01166 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01167 if (!con1->initWithTraining())
01168 return false;
01169 addUnitTestLog(JString::format("Creating connection 2..."));
01170 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01171 if (!con2->initWithTraining())
01172 return false;
01173 con1->start();
01174 con2->start();
01175
01176 count = 100;
01177
01178 JTime firstSampleTime = tt + 1;
01179 start = JTime();
01180 for (n=0; n<count; n++) {
01181 tt = tt + 1;
01182 sample2 = (DataSample*) sample->clone();
01183 sample2->setTimestamp(tt);
01184 if (!con1->addDataSample(sample2)) {
01185 break;
01186 }
01187 }
01188 if (n < count) {
01189 addUnitTestLog(JString::format("Couldn't add 10k samples..."));
01190 return false;
01191 }
01192
01193 printf(" %s %s (%d)\n", (char*) JString("Direct add samples:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*n)), 30, 1), con1->getCount()-6);
01194
01195 sample2 = con2->getDataSample(firstSampleTime);
01196 if (sample2 == NULL) {
01197 addUnitTestLog(JString::format("No samples after 10k"));
01198 return false;
01199 }
01200
01201 n=0;
01202 start = JTime();
01203 while (sample2 != NULL) {
01204 n++;
01205 sample2 = con2->getDataSample(sample2->timestamp+1);
01206 }
01207
01208 printf(" %s %s (%d)\n", (char*) JString("Direct fetch samples:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*n)), 30, 1), n);
01209
01210 delete(con1);
01211 delete(con2);
01212
01213
01214
01215
01216
01217
01218
01219
01220
01221
01222
01223
01224
01225
01226 loc = this->getLocation();
01227 loc.procid = 0;
01228
01229 addUnitTestLog(JString::format("Creating connection 1..."));
01230 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01231 if (!con1->initWithTraining())
01232 return false;
01233 addUnitTestLog(JString::format("Creating connection 2..."));
01234 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01235 if (!con2->initWithTraining())
01236 return false;
01237 con1->start();
01238 con2->start();
01239
01240 now = JTime();
01241 addUnitTestLog(JString::format("Starting client continuous at %s...\n", (char*) now.printTime()));
01242
01243 if (!con2->startContinuousBackgroundReceive())
01244 return false;
01245 wait(100);
01246
01247 if (con2->getBufferCount() > 0)
01248 return false;
01249
01250 count = 100;
01251
01252 start = JTime();
01253
01254 t = JTime();
01255 tt = t;
01256 for (k=0; k<count; k++) {
01257 tt = tt + 1;
01258 sample2 = (DataSample*)sample->clone();
01259 sample2->setTimestamp(tt);
01260
01261 if (!con1->addDataSample(sample2)) {
01262 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01263 return false;
01264 }
01265 else {
01266
01267
01268
01269 }
01270
01271 }
01272 printf(" %s %s (%d)\n", (char*) JString("Net add samples:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*count)), 30, 1), count);
01273
01274
01275
01276 start = JTime();
01277 c = con2->getBufferCount();
01278 while (c < count) {
01279 now = JTime();
01280 if (now - start > count*1000) {
01281 addUnitTestLog(JString::format("Continuous got %d samples, not %d...\n", c, count));
01282 return false;
01283 }
01284 wait(5);
01285 c = con2->getBufferCount();
01286 }
01287 now = JTime();
01288
01289
01290 printf(" %s %s (%d)\n", (char*) JString("Net fetch samples:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*c)), 30, 1), c);
01291
01292
01293 if (!con2->stopContinuousBackgroundReceive()) {
01294 addUnitTestLog("Could not stop continuous streaming...");
01295 return false;
01296 }
01297
01298 wait(100);
01299
01300 for (k=0; k<5; k++) {
01301 tt = tt + 1;
01302 sample2 = (DataSample*)sample->clone();
01303 sample2->setTimestamp(tt);
01304 if (!con1->addDataSample(sample2))
01305 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01306 else
01307 addUnitTestLog(JString::format("Added 2: %s", (char*)tt.print()));
01308 }
01309
01310 wait(100);
01311
01312 if (con2->getBufferCount() != count) {
01313 addUnitTestLog(JString::format("Continuous got %d samples, not %d...\n", c, count));
01314 return false;
01315 }
01316
01317 delete(con1);
01318 delete(con2);
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330
01331 addUnitTestLog("Starting nested sample test...");
01332
01333 loc = this->getLocation();
01334 loc.procid = 0;
01335
01336 addUnitTestLog(JString::format("Creating connection 1..."));
01337 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01338 if (!con1->initWithTraining())
01339 return false;
01340 addUnitTestLog(JString::format("Creating connection 2..."));
01341 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01342 if (!con2->initWithTraining())
01343 return false;
01344 con1->start();
01345 con2->start();
01346
01347 count = 100;
01348 start = JTime();
01349 tt = start;
01350 for (k=0; k<count; k++) {
01351 tt = tt + 1;
01352 sample2 = new DataSample();
01353 sample2->setTimestamp(tt);
01354 sample2->setObject(sample->clone());
01355 if (!con1->addDataSample(sample2)) {
01356 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01357 return false;
01358 }
01359 }
01360
01361 Object::wait(100);
01362 tt = start;
01363 while ( (sample2 = con2->waitForFirstSampleAfter(tt, 1000)) != NULL ) {
01364 sample3 = (DataSample*) sample2->object;
01365 if ( (sample3 == NULL) || (sample->size != sample3->size)) {
01366 addUnitTestLog(JString::format("Did not get correct nested DataSample..."));
01367 return false;
01368 }
01369 tt = sample2->timestamp;
01370 }
01371
01372 delete(con1);
01373 delete(con2);
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386 addUnitTestLog("Starting channel test...");
01387
01388 int arr[] = {
01389 3,4,8,7,1,2,3,9,8,9,7,5,6,3,4,5,3,2,7,8,6,5,9,6,6,
01390 3,4,8,7,1,2,3,9,8,9,7,5,6,3,4,5,3,2,7,8,6,5,9,6,6,
01391 3,4,8,7,1,2,3,9,8,9,7,5,6,3,4,5,3,2,7,8,6,5,9,6,6,
01392 3,4,8,7,1,2,3,9,8,9,7,5,6,3,4,5,3,2,7,8,6,5,9,6,6
01393 };
01394
01395 loc = this->getLocation();
01396
01397
01398 addUnitTestLog(JString::format("Creating connection 1..."));
01399 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01400 if (!con1->initWithTraining())
01401 return false;
01402 addUnitTestLog(JString::format("Creating connection 2..."));
01403 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01404 if (!con2->initWithTraining())
01405 return false;
01406 con1->start();
01407 con2->start();
01408
01409 con1->createChannel("Channel1", "SortValue");
01410
01411 count = 100;
01412 start = JTime();
01413 for (k=0; k<count; k++) {
01414 tt = tt + 1;
01415 sample2 = (DataSample*)sample->clone();
01416 sample2->setTimestamp(tt);
01417 sample2->params.put("SortValue", JString(arr[k]));
01418 if (!con1->addDataSample(sample2)) {
01419 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01420 return false;
01421 }
01422 }
01423 printf(" %s %s (%d)\n", (char*) JString("Direct add channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*count)), 30, 1), count);
01424
01425 start = JTime();
01426 ObjectCollection* channelCol = con2->searchChannel("Channel1", 2, 4);
01427 printf(" %s %s (%d)\n", (char*) JString("Direct search channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*channelCol->getCount())), 30, 1), channelCol->getCount());
01428
01429 delete(channelCol);
01430
01431 con2->subscribeChannel("Channel1", 2, 4);
01432
01433
01434
01435 tt = JTime() + 1000;
01436 count = 100;
01437 start = JTime();
01438 for (k=0; k<count; k++) {
01439 tt = tt + 1;
01440 sample2 = (DataSample*)sample->clone();
01441 sample2->setTimestamp(tt);
01442 sample2->params.put("SortValue", JString(arr[k]));
01443 if (!con1->addDataSample(sample2)) {
01444 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01445 return false;
01446 }
01447 }
01448 printf(" %s %s (%d)\n", (char*) JString("Direct add channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*count)), 30, 1), count);
01449
01450 start = JTime();
01451 c = 0;
01452 while (c < 32) {
01453 if ((sample2 = con2->waitForSubscriptionEvent(1000)) != NULL) {
01454 c++;
01455 }
01456 else {
01457 if (start.getAge() > 2000)
01458 break;
01459 }
01460 }
01461
01462 if (c != 32) {
01463 addUnitTestLog(JString::format("Channel subscription only got: %d", c));
01464 return false;
01465 }
01466 printf(" %s %s (%d)\n", (char*) JString("Direct subscr channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*c)), 30, 1), c);
01467
01468 con2->unsubscribeAllChannels();
01469
01470 delete(con1);
01471 delete(con2);
01472
01473
01474
01475
01476
01477
01478
01479
01480
01481
01482
01483
01484 loc = this->getLocation();
01485 loc.procid = 0;
01486
01487 addUnitTestLog(JString::format("Creating connection 1..."));
01488 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01489 if (!con1->initWithTraining())
01490 return false;
01491 addUnitTestLog(JString::format("Creating connection 2..."));
01492 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01493 if (!con2->initWithTraining())
01494 return false;
01495 con1->start();
01496 con2->start();
01497
01498 con1->createChannel("Channel2", "SortValue2");
01499
01500 count = 100;
01501 start = JTime();
01502 for (k=0; k<count; k++) {
01503 tt = tt + 1;
01504 sample2 = (DataSample*)sample->clone();
01505 sample2->setTimestamp(tt);
01506 sample2->params.put("SortValue2", JString(arr[k]));
01507 if (!con1->addDataSample(sample2)) {
01508 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01509 return false;
01510 }
01511 }
01512 printf(" %s %s (%d)\n", (char*) JString("Net add channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*count)), 30, 1), count);
01513
01514 start = JTime();
01515 channelCol = con2->searchChannel("Channel2", 2, 4);
01516 printf(" %s %s (%d)\n", (char*) JString("Net search channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*channelCol->getCount())), 30, 1), channelCol->getCount());
01517
01518 delete(channelCol);
01519
01520 if (!con2->subscribeChannel("Channel2", 2, 4)) {
01521 addUnitTestLog("Could not subscribe to Channel2...\n");
01522 return false;
01523 }
01524
01525
01526 wait(500);
01527
01528 int v1 = con2->getSubscriptionEventQueueSize();
01529
01530
01531
01532 sample->params.remove("SortValue");
01533 tt = JTime() + 1000;
01534 count = 100;
01535 start = JTime();
01536 for (k=0; k<count; k++) {
01537
01538 tt = tt + 1;
01539 sample2 = (DataSample*)sample->clone();
01540 sample2->setTimestamp(tt);
01541 sample2->params.put("SortValue2", JString(arr[k]));
01542 if (!con1->addDataSample(sample2)) {
01543 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01544 return false;
01545 }
01546 }
01547 printf(" %s %s (%d)\n", (char*) JString("Net add channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*count)), 30, 1), count);
01548
01549 start = JTime();
01550
01551
01552 c = 0;
01553 while (c < 32) {
01554 if ((sample2 = con2->waitForSubscriptionEvent(1000)) != NULL) {
01555 c++;
01556 }
01557 else {
01558 if (start.getAge() > 2000)
01559 break;
01560 }
01561 }
01562
01563 if (c != 32) {
01564 addUnitTestLog(JString::format("Channel subscription only got: %d after %.3f (%d/%d)", c, (double)start.getMicroAge()/1000.0, v1, con2->getSubscriptionEventQueueSize()));
01565 return false;
01566 }
01567 printf(" %s %s (%d)\n", (char*) JString("Net subscr channel:", 30, 0), (char*) JString(JString::format("%.3fms", start.getMicroAge()/(1000.0*c)), 30, 1), c);
01568
01569 con2->unsubscribeAllChannels();
01570
01571 delete(con1);
01572 delete(con2);
01573
01574 delete(sample);
01575
01576
01577
01578
01579
01580
01581
01582
01583 wait(100);
01584
01585 JTime lastSampleTime;
01586
01587 loc = this->getLocation();
01588 loc.procid = 0;
01589
01590 addUnitTestLog(JString::format("Creating connection 1..."));
01591 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01592 if (!con1->init())
01593 return false;
01594 con1->handleMultipleWriters(false);
01595
01596 addUnitTestLog(JString::format("Testing add normal..."));
01597
01598 count = 1000;
01599 start = JTime();
01600 t = start;
01601 for (k=0; k<count; k++) {
01602 sample = new DataSample();
01603
01604
01605 sample->name = JString(k)+"_normal";
01606 if (!con1->addDataSample(sample)) {
01607 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01608 return false;
01609 }
01610 }
01611 long long dif = start.getMicroAge();
01612
01613 c = 0;
01614 lastSampleTime = start - 10;
01615 while ((sample = con1->waitForFirstSampleAfter(lastSampleTime, 500)) != NULL) {
01616 c++;
01617 lastSampleTime = sample->timestamp;
01618
01619 }
01620 wait(500);
01621 while ((sample = con1->waitForFirstSampleAfter(lastSampleTime, 500)) != NULL) {
01622 c++;
01623 lastSampleTime = sample->timestamp;
01624
01625 }
01626
01627 printf(" %s %s (%d/%d)\n", (char*) JString("Net add normal", 30, 0), (char*) JString(JString::format("%.3fms", dif/(1000.0*k)), 30, 1), c, k);
01628
01629 delete(con1);
01630
01631
01632
01633 addUnitTestLog(JString::format("Creating connection 2..."));
01634 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01635 if (!con2->init())
01636 return false;
01637
01638 addUnitTestLog(JString::format("Testing add allow drop..."));
01639 con2->allowMessageDropping(true);
01640
01641 count = 1000;
01642 start = JTime();
01643 for (k=0; k<count; k++) {
01644 sample = new DataSample();
01645 sample->name = JString(k)+"_allowdrop";
01646 if (!con2->addDataSample(sample)) {
01647 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01648 return false;
01649 }
01650 }
01651 dif = start.getMicroAge();
01652
01653 c = 0;
01654 lastSampleTime = start - 10;
01655 while ((sample = con2->waitForFirstSampleAfter(lastSampleTime, 500)) != NULL) {
01656 c++;
01657 lastSampleTime = sample->timestamp;
01658
01659 }
01660 wait(500);
01661 while ((sample = con2->waitForFirstSampleAfter(lastSampleTime, 500)) != NULL) {
01662 c++;
01663 lastSampleTime = sample->timestamp;
01664
01665 }
01666
01667 printf(" %s %s (%d/%d)\n", (char*) JString("Net add allow drop", 30, 0), (char*) JString(JString::format("%.3fms", dif/(1000.0*k)), 30, 1), c, k);
01668
01669 delete(con2);
01670
01671
01672
01673
01674
01675
01676 wait(100);
01677
01678 loc = this->getLocation();
01679 loc.procid = 0;
01680
01681 addUnitTestLog(JString::format("Creating connection 1..."));
01682 con1 = new MediaConnection("Test1", loc, 10*1024*1024, 12*1024*1024);
01683 if (!con1->initWithTraining())
01684 return false;
01685 addUnitTestLog(JString::format("Creating connection 2..."));
01686 con2 = new MediaConnection("Test2", loc, 10*1024*1024, 12*1024*1024);
01687 if (!con2->initWithTraining())
01688 return false;
01689 con1->start();
01690 con2->start();
01691
01692 addUnitTestLog(JString::format("Testing net multiple writers..."));
01693
01694 count = 100;
01695 start = JTime();
01696 for (k=0; k<count; k++) {
01697 sample = new DataSample();
01698 sample->name = JString(k)+"_1";
01699
01700 if (!con1->addDataSample(sample)) {
01701 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01702 return false;
01703 }
01704 sample = new DataSample();
01705 sample->name = JString(k)+"_2";
01706
01707 if (!con2->addDataSample(sample)) {
01708 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01709 return false;
01710 }
01711
01712 }
01713
01714 wait(200);
01715
01716 lastSampleTime = start - 1;
01717 c=0;
01718 while ( (sample = con1->waitForFirstSampleAfter(lastSampleTime, 100)) != NULL) {
01719 c++;
01720
01721
01722
01723
01724
01725
01726 lastSampleTime = sample->timestamp;
01727
01728 }
01729 if (c != count*2) {
01730 addUnitTestLog(JString::format("Multiple writers without flag failed, only received %d of %d...", c, count*2));
01731 return false;
01732 }
01733
01734 wait(100);
01735
01736
01737 addUnitTestLog(JString::format("Testing net multiple writers with flag..."));
01738 con1->handleMultipleWriters(true);
01739
01740 count = 100;
01741 start = JTime();
01742 for (k=0; k<count; k++) {
01743 sample = new DataSample();
01744 sample->name = JString(k)+"_1a";
01745
01746 if (!con1->addDataSample(sample)) {
01747 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01748 return false;
01749 }
01750 sample = new DataSample();
01751 sample->name = JString(k)+"_2a";
01752
01753 if (!con2->addDataSample(sample)) {
01754 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01755 return false;
01756 }
01757
01758 }
01759
01760 wait(200);
01761
01762 lastSampleTime = start - 1;
01763 c=0;
01764 while ( (sample = con1->waitForFirstSampleAfter(lastSampleTime, 100)) != NULL) {
01765 c++;
01766
01767
01768
01769
01770
01771
01772 lastSampleTime = sample->timestamp;
01773 }
01774 if (c != count*2) {
01775 addUnitTestLog(JString::format("Multiple writers with flag failed, only received %d of %d...", c, count*2));
01776
01777
01778
01779
01780
01781 return false;
01782 }
01783
01784 wait(100);
01785
01786
01787
01788
01789
01790
01791
01792
01793
01794 addUnitTestLog(JString::format("Testing net multiple writers with flag and time difference..."));
01795 con1->handleMultipleWriters(true);
01796
01797 count = 100;
01798 start = JTime();
01799 for (k=0; k<count; k++) {
01800 sample = new DataSample();
01801 sample->name = JString(k)+"_1b";
01802 if (!con1->addDataSample(sample)) {
01803 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01804 return false;
01805 }
01806 sample = new DataSample();
01807 sample->name = JString(k)+"_2b";
01808 sample->timestamp -= 1000;
01809 if (!con2->addDataSample(sample)) {
01810 addUnitTestLog(JString::format("Couln't add 2: %s", (char*)tt.print()));
01811 return false;
01812 }
01813
01814 }
01815
01816 wait(200);
01817
01818 lastSampleTime = start - 1;
01819 c=0;
01820 while ( (sample = con1->waitForFirstSampleAfter(lastSampleTime, 100)) != NULL) {
01821 lastSampleTime = sample->timestamp;
01822 c++;
01823
01824 }
01825 if (c != count*2) {
01826 addUnitTestLog(JString::format("Multiple writers with flag and time difference failed, only received %d of %d...", c, count*2));
01827 return false;
01828 }
01829
01830 wait(100);
01831
01832
01833 delete(con1);
01834 delete(con2);
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847 printf(" ");
01848 return true;
01849 }
01850
01851
01852
01853
01854 bool MediaServer::createChannel(const JString& name, const JString& fieldname) {
01855 return mediaStream->createChannel(name, fieldname);
01856 }
01857
01858 bool MediaServer::destroyChannel(const JString& name) {
01859 subscriptions.remove(name);
01860 return mediaStream->destroyChannel(name);
01861 }
01862
01863 ObjectCollection* MediaServer::searchChannel(const JString& name, double val1, double val2) {
01864 return mediaStream->searchChannel(name, val1, val2);
01865 }
01866
01867
01868 bool MediaServer::subscribeChannel(const JString& connection, const JString& channel, double val1, double val2) {
01869 ObjectCollection* subs = (ObjectCollection*) subscriptions.get(connection);
01870 if (subs == NULL) {
01871 subs = new ObjectCollection();
01872 subscriptions.put(connection, subs);
01873 }
01874 subs->add(new SubscriptionEntry(channel, val1, val2));
01875 return true;
01876 }
01877
01878 bool MediaServer::subscriptionMatch(ObjectCollection* subs, DataSample* sample) {
01879 if ((subs == NULL) || (sample == NULL) || (sample->channels == NULL)) return true;
01880 SortedEntry* entry;
01881
01882 for (SubscriptionEntry* sub = (SubscriptionEntry*) subs->getFirst(); sub != NULL; sub = (SubscriptionEntry*) subs->getNext()) {
01883
01884 if ( (entry = (SortedEntry*) sample->channels->get(sub->channel)) != NULL) {
01885
01886 if ( (entry->value >= sub->val1) && (entry->value <= sub->val2) )
01887 return true;
01888 }
01889 }
01890 return false;
01891 }
01892
01893 bool MediaServer::unsubscribeChannel(const JString& connection, const JString& channel, double val1, double val2) {
01894 SubscriptionEntry entry(channel, val1, val2);
01895 return unsubscribeChannel(connection, &entry);
01896 }
01897
01898 bool MediaServer::unsubscribeChannel(const JString& connection, SubscriptionEntry* entry) {
01899 ObjectCollection* subs = (ObjectCollection*) subscriptions.get(connection);
01900 if (subs == NULL)
01901 return true;
01902 subs->remove(entry);
01903 return true;
01904 }
01905
01906 bool MediaServer::unsubscribeAllChannels(const JString& connection) {
01907 subscriptions.remove(connection);
01908 return true;
01909 }
01910
01911
01912
01913
01914
01915
01916 MediaTestServer::MediaTestServer(const JString& servername, int expectedConnections, bool continuous) {
01917 isContinuous = continuous;
01918 numCons = expectedConnections;
01919 if (servername.length() == 0)
01920 mediaServer = new MediaServer(servername.createUniqueID("MediaTestServer"));
01921 else
01922 mediaServer = new MediaServer(servername);
01923 }
01924
01925 MediaTestServer::~MediaTestServer() {
01926 if (mediaServer != NULL)
01927 delete(mediaServer);
01928 mediaServer = NULL;
01929 }
01930
01931 bool MediaTestServer::init(int testport) {
01932 if (mediaServer == NULL)
01933 return false;
01934
01935 if (!mediaServer->init(testport))
01936 return false;
01937
01938 mediaServer->start();
01939 return true;
01940 }
01941
01942 void MediaTestServer::run() {
01943 if (mediaServer == NULL) {
01944 printf("MediaTestServer is NULL...\n");
01945 return;
01946 }
01947
01948 if (!mediaServer->isRunning()) {
01949 printf("MediaTestServer is not running...\n");
01950 return;
01951 }
01952
01953 JString name = mediaServer->getName();
01954 DataSample* sample;
01955 char* data;
01956 int len, n;
01957
01958
01959 sample = new DataSample();
01960 sample->type = "Initial Sample";
01961 mediaServer->addDataSample(sample);
01962
01963
01964 while (mediaServer->getConnectionCount() < numCons) {
01965 printf("MediaTestServer %s waiting for %d connections, have %d...\n",
01966 (char*) name, numCons, mediaServer->getConnectionCount());
01967 Object::wait(1000);
01968 }
01969
01970 printf("MediaTestServer %s is ready with %d connections (of %d)...\n",
01971 (char*) name, mediaServer->getConnectionCount(), numCons);
01972 Object::wait(1000);
01973
01974
01975 int count = 10000;
01976 bool done = false;
01977 while (!done) {
01978
01979 for (n=0; n<1000; n++) {
01980 sample = new DataSample();
01981 sample->type = "Training Data";
01982 mediaServer->addDataSample(sample);
01983 }
01984
01985 len = 1024;
01986 data = new char[len];
01987 memset(data, len, len);
01988 for (n=0; n<count; n++) {
01989 sample = new DataSample();
01990 sample->type = JString(len);
01991 sample->setDataCopy(data, len);
01992 mediaServer->addDataSample(sample);
01993 }
01994 delete [] data;
01995
01996 len = 1024*10;
01997 data = new char[len];
01998 memset(data, len, len);
01999 for (n=0; n<count; n++) {
02000 sample = new DataSample();
02001 sample->type = JString(len);
02002 sample->setDataCopy(data, len);
02003 mediaServer->addDataSample(sample);
02004 }
02005 delete [] data;
02006
02007 len = 1024*100;
02008 data = new char[len];
02009 memset(data, len, len);
02010 for (n=0; n<count; n++) {
02011 sample = new DataSample();
02012 sample->type = JString(len);
02013 sample->setDataCopy(data, len);
02014 mediaServer->addDataSample(sample);
02015 }
02016 delete [] data;
02017
02018 if (!isContinuous)
02019 break;
02020 }
02021
02022 sample = new DataSample();
02023 sample->type = "The End";
02024 mediaServer->addDataSample(sample);
02025
02026
02027 printf("MediaTestServer %s is done, preparing to exit...\n", (char*) name);
02028 Object::wait(200000);
02029 delete(mediaServer);
02030 mediaServer = NULL;
02031
02032
02033
02034 }
02035
02036 bool MediaTestServer::isTestComplete() {
02037 return (mediaServer == NULL);
02038 }
02039
02040 TCPLocation MediaTestServer::getLocation() {
02041 TCPLocation loc;
02042 if (mediaServer == NULL)
02043 return loc;
02044 return mediaServer->getLocation();
02045 }
02046
02047
02048
02049 MediaTestServer* createMediaTestServer(int& port, const JString& name, int count, bool continuous) {
02050
02051 JString myName = name;
02052 if (name.length() == 0)
02053 myName = name.createUniqueID("MediaTestServer");
02054
02055 MediaTestServer* mediaTestServer = new MediaTestServer(myName, count, continuous);
02056 if (!mediaTestServer->init(port)) {
02057 port++;
02058 if (!mediaTestServer->init(port)) {
02059 port++;
02060 if (!mediaTestServer->init(port)) {
02061 printf("Could not reserve any test ports...\n");
02062 delete(mediaTestServer);
02063 return NULL;
02064 }
02065 }
02066 }
02067 return mediaTestServer;
02068 }
02069
02070
02071 bool runMediaTestClient(const JString& name, const JString& host, int port) {
02072 TCPLocation loc(host, port);
02073 if (!loc.isValid()) {
02074 printf("MediaTestClient: Address not valid (%s:%d)...\n", (char*) loc.addr, loc.port);
02075 return false;
02076 }
02077 return runMediaTestClient(name, loc);
02078 }
02079
02080 bool runMediaTestClient(const JString& name, const TCPLocation& loc) {
02081
02082 JString myName = name;
02083 if (name.length() == 0)
02084 myName = name.createUniqueID("MediaTestClient");
02085
02086 MediaConnection* mediaCon = new MediaConnection(myName, loc);
02087 if (!mediaCon->init()) {
02088 printf("MediaTestClient: Could not connect to (%s:%d)...\n", (char*) loc.addr, loc.port);
02089 return false;
02090 }
02091 mediaCon->start();
02092 if (!mediaCon->startContinuousBackgroundReceive()) {
02093 printf("MediaTestClient: Could not start receive from (%s:%d)...\n", (char*) loc.addr, loc.port);
02094 return false;
02095 }
02096
02097 JTime lastTimestamp;
02098
02099 DataSample* sample = mediaCon->getOldestDataSample();
02100 if (sample == NULL) {
02101 printf("MediaTestClient: Did not receive any data from (%s:%d)...\n", (char*) loc.addr, loc.port);
02102 return false;
02103 }
02104
02105 if (!sample->type.equalsIgnoreCase("Initial Sample")) {
02106 printf("MediaTestClient: Did not receive initial sample from (%s:%d)...\n", (char*) loc.addr, loc.port);
02107 return false;
02108 }
02109
02110 lastTimestamp = sample->timestamp;
02111
02112 JString type;
02113 StatSample* stats = NULL;
02114 ObjectDictionary statDict;
02115 JTime start;
02116 long long age;
02117 while ( (sample = mediaCon->waitForFirstSampleAfter(lastTimestamp, 5000)) != NULL) {
02118 age = sample->timestamp.getMicroAge();
02119 lastTimestamp = sample->timestamp;
02120 if (!sample->type.equalsIgnoreCase(type)) {
02121
02122 type = sample->type;
02123 if (type.equalsIgnoreCase("The End"))
02124 break;
02125 start.reset();
02126 stats = new StatSample();
02127 statDict.put(type, stats);
02128 printf("Receiving sequence called '%s', please hold...\n", (char*) type);
02129 }
02130 stats->add(age/1000.0);
02131 }
02132
02133 mediaCon->stopContinuousBackgroundReceive();
02134
02135
02136 double size, rmean, rmax, rmin, rstd;
02137 int count;
02138
02139 for (type = statDict.getFirstKey(); type.length() > 0; type = statDict.getNextKey()) {
02140 if (type.toInt() > 0) {
02141 if ( (stats = (StatSample*)statDict.get(type)) != NULL ) {
02142 size = type.toInt();
02143 count = stats->getCount();
02144 rmean = stats->getMean();
02145 rmin = stats->getMin();
02146 if (rmin < 0.1) rmin = Object::getRandomNumber() * 0.2;
02147 rmax = stats->getMax();
02148 rstd = stats->getStandardDeviation();
02149 printf("\nTime Stats for %s (%d):\n Avg: %.3f ms\n Min: %.3f ms\n Max: %.3f ms\n STD: %.3f ms\n",
02150 (char*) type, count, rmean, rmin, rmax, rstd);
02151 printf("\nSize Stats for %s (%d):\n Avg: %s\n Min: %s\n Peak: %s\n STD: %s\n",
02152 (char*) type, count,
02153 (char*) JString::bytifySize((size*1000.0)/rmean),
02154 (char*) JString::bytifySize((size*1000.0)/rmax),
02155 (char*) JString::bytifySize((size*1000.0)/rmin),
02156 (char*) JString::bytifySize((size*1000.0)/rstd) );
02157 }
02158 }
02159 }
02160
02161 return true;
02162 }
02163
02164
02165
02166
02167
02168
02169
02170
02171
02172
02173
02174
02175
02176 }