00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "MediaStream.h"
00022
00023 namespace cmlabs {
00024
00025 #define STREAMMAXCOUNTDEFAULT 1000
00026 #define BACKUPMINCOUNTDEFAULT 20
00027 #define BACKUPMAXCOUNTDEFAULT 100
00028
00029 MediaStream::MediaStream(const JString& streamname, long hardMaxSize, long softMaxSize, long maxCount, long maxBackupCount) {
00030 outputData.sorting = SORTBYKEY;
00031 dataSamples.sorting = SORTBYKEY;
00032 allowMultipleWriters = false;
00033
00034 name = streamname;
00035 if (name.length() == 0)
00036 name = createUniqueID("stream");
00037
00038 if ( hardMaxSize < 0 )
00039 hardMax = 30*1024*1024;
00040 else
00041 hardMax = hardMaxSize;
00042
00043 if ( softMaxSize < 0 )
00044 softMax = (long)(hardMaxSize*0.9);
00045 else
00046 softMax = softMaxSize;
00047
00048 this->maxCount = maxCount;
00049 if (maxBackupCount <= 0)
00050 this->maxBackupCount = BACKUPMAXCOUNTDEFAULT;
00051 else
00052 this->maxBackupCount = maxBackupCount;
00053 bytesEverHandled = 0;
00054 countEverHandled = 0;
00055 currentSize = 0;
00056 }
00057
00058 MediaStream::~MediaStream() {
00059 dataSamples.removeAll();
00060 currentSize = 0;
00061 }
00062
00063 Object* MediaStream::clone() const {
00064 return NULL;
00065 }
00066
00067 JString MediaStream::print() {
00068
00069 JString str;
00070 DataSample* sample;
00071
00072 for (int n=0; n < dataSamples.getCount(); n++) {
00073 sample = (DataSample*) dataSamples.get(n);
00074 if (sample != NULL)
00075 str += JString::format("%3d %s\n", n, (char*) sample->print());
00076 }
00077 return str;
00078 }
00079
00080 bool MediaStream::handleMultipleWriters(bool allow) {
00081 allowMultipleWriters = allow;
00082 return true;
00083 }
00084
00085 InfoItem* MediaStream::getInfo() {
00086 InfoItem* info = new InfoItem();
00087
00088 long val;
00089 info->setEntry("CurrentCount", JString(getCount()));
00090 info->setEntry("TotalCount", JString(this->getTotalCountEverHandled()));
00091
00092 val = currentSize;
00093 info->setEntry("CurrentSize", JString(val));
00094 info->setEntry("CurrentSizeString", JString::bytifySize(val));
00095 double dval = this->getTotalDataEverHandled();
00096 info->setEntry("TotalSize", JString(dval));
00097 info->setEntry("TotalSizeString", JString::bytifySize(dval));
00098 double valin = getInputDataRate();
00099 double valout = getOutputDataRate();
00100 info->setEntry("CurrentRateIn", JString(valin));
00101 info->setEntry("CurrentRateOut", JString(valout));
00102 info->setEntry("CurrentRateInString", JString::bytifyRate(valin));
00103 info->setEntry("CurrentRateOutString", JString::bytifyRate(valout));
00104 info->setEntry("CurrentRatesString", JString::bytifyRates(valin, valout));
00105
00106 DataSample* sample = getNewestSample();
00107 if (sample == NULL) {
00108 info->setEntry("PackageSize", JString("0"));
00109 info->setEntry("PackageSizeString", JString::bytifySize((long)0));
00110 }
00111 else {
00112 info->setEntry("PackageSize", JString(sample->size));
00113 info->setEntry("PackageSizeString", JString::bytifySize(sample->size));
00114 }
00115
00116 info->setTime("OldestTime", getOldestSampleTime());
00117 info->setTime("NewestTime", getNewestSampleTime());
00118
00119 return info;
00120 }
00121
00122
00123 bool MediaStream::addSample(DataSample* sample) {
00124
00125 if (sample == NULL)
00126 return false;
00127
00128 bool res = false;
00129
00130
00131 if (allowMultipleWriters)
00132 sample->timestamp.reset();
00133
00134
00135
00136
00137 JTime* t = (JTime*) sample->timestamp.clone();
00138 t->perfCountReset();
00139
00140
00141 if (accessMutex.EnterMutex(1000)) {
00142 while (dataSamples.containsKey(t))
00143 t->atomicOffset++;
00144 sample->timestamp.atomicOffset = t->atomicOffset;
00145
00146 if (!(res = dataSamples.add(t, sample))) {
00147 delete(sample);
00148 delete(t);
00149 }
00150 else {
00151 if (channels.getCount() > 0) {
00152 addChannelSample(sample);
00153 }
00154 countEverHandled++;
00155 bytesEverHandled += sample->getDataSize();
00156 currentSize += sample->getDataSize();
00157 }
00158 }
00159 accessMutex.LeaveMutex();
00160
00161 doMaintenance();
00162 newDataSem.post();
00163
00164 return res;
00165 }
00166
00167 bool MediaStream::addSamples(ObjectCollection* samples, bool deleteCol) {
00168
00169 if ((samples == NULL) || (samples->getCount() == 0)) {
00170 if (deleteCol)
00171 delete(samples);
00172 return false;
00173 }
00174
00175 JTime* t;
00176 DataSample* sample;
00177
00178 for (int n=0; n<samples->getCount(); n++) {
00179 if ( (sample = (DataSample*) samples->get(n)) != NULL ) {
00180 if (allowMultipleWriters)
00181 sample->timestamp.reset();
00182
00183
00184
00185
00186 t = (JTime*) sample->getTimestamp().clone();
00187 t->perfCountReset();
00188 while (dataSamples.containsKey(t))
00189 t->atomicOffset++;
00190 sample->timestamp.atomicOffset = t->atomicOffset;
00191 if (accessMutex.EnterMutex(1000)) {
00192 if (!dataSamples.add(t, sample)) {
00193 samples->remove(n);
00194 delete(t);
00195 n--;
00196 }
00197 else {
00198 if (channels.getCount() > 0)
00199 addChannelSample(sample);
00200 countEverHandled++;
00201 bytesEverHandled += sample->getDataSize();
00202 currentSize += sample->getDataSize();
00203 }
00204 }
00205 accessMutex.LeaveMutex();
00206 }
00207 }
00208
00209 doMaintenance();
00210 newDataSem.post();
00211
00212 if (deleteCol) {
00213 samples->removeAllNoDelete();
00214 delete(samples);
00215 }
00216
00217 return true;
00218 }
00219
00220 bool MediaStream::deleteSample(DataSample* sample) {
00221 if (sample == NULL)
00222 return false;
00223 bool ret = false;
00224 if (accessMutex.EnterMutex(1000)) {
00225 currentSize -= sample->getDataSize();
00226
00227
00228 dataSamples.removeNoDelete(&sample->timestamp);
00229 sample->markForDeletion();
00230
00231
00232
00233
00234
00235 deletedSamples.add(sample);
00236 accessMutex.LeaveMutex();
00237 }
00238 return true;
00239 }
00240
00241 bool MediaStream::deleteSample(int pos) {
00242 bool ret = false;
00243 if (accessMutex.EnterMutex(1000)) {
00244 ret = deleteSample((DataSample*)dataSamples.get(pos));
00245 accessMutex.LeaveMutex();
00246 }
00247 return ret;
00248 }
00249
00250
00251 double MediaStream::getTotalDataEverHandled() {
00252 return bytesEverHandled;
00253 }
00254
00255 long MediaStream::getTotalCountEverHandled() {
00256 return countEverHandled;
00257 }
00258
00259 double MediaStream::getInputDataRate() {
00260
00261 if (dataSamples.getCount() == 0)
00262 return 0;
00263
00264 int max = 10, c;
00265
00266 DataSample* sample;
00267 JTime now, *oldest;
00268 long ms = 1;
00269 double size = 0;
00270
00271 if (accessMutex.EnterMutex(1000)) {
00272 c = dataSamples.getCount();
00273 if (c < max)
00274 max = c;
00275 oldest = (JTime*) dataSamples.getKey(c - max);
00276
00277
00278
00279
00280
00281
00282 ms = now - *oldest;
00283 size = 0;
00284 for (int n=0; n<max; n++) {
00285 if ( (sample = (DataSample*) dataSamples.get(c-1-n)) != NULL) {
00286 size += sample->getDataSize();
00287 }
00288 else
00289 ms = ms;
00290 }
00291 }
00292 accessMutex.LeaveMutex();
00293
00294
00295 double res = (size/((double)ms/1000));
00296 if (res < 0)
00297 res = 0;
00298 return res;
00299 }
00300
00301 double MediaStream::getOutputDataRate() {
00302 if (outputData.getCount() == 0)
00303 return 0;
00304 JTime now, *oldest;
00305 long ms = 1;
00306 double size = 0;
00307
00308 if (accessMutex.EnterMutex(1000)) {
00309 oldest = (JTime*) outputData.getFirstKey();
00310 if (oldest == NULL) {
00311 accessMutex.LeaveMutex();
00312 return 0;
00313 }
00314 ms = now - *oldest;
00315 if (ms > 100000)
00316 ms = ms;
00317
00318 for (int n=0; n<outputData.getCount(); n++)
00319 size += ((JString*)outputData.get(n))->toDouble();
00320 }
00321 accessMutex.LeaveMutex();
00322
00323 double res = (size/((double)ms/1000));
00324 if (res < 0)
00325 res = 0;
00326 return res;
00327 }
00328
00329
00330 bool MediaStream::deleteSampleAt(const JTime& time) {
00331 bool ret = false;
00332 if (accessMutex.EnterMutex(1000)) {
00333
00334
00335
00336 DataSample* sample = (DataSample*) dataSamples.get(&time);
00337 if (sample != NULL) {
00338 currentSize -= sample->getDataSize();
00339 dataSamples.removeNoDelete(&time);
00340 sample->markForDeletion();
00341
00342
00343
00344
00345
00346 deletedSamples.add(sample);
00347 }
00348 accessMutex.LeaveMutex();
00349 }
00350 return ret;
00351 }
00352
00353 bool MediaStream::deleteSamplesOlderThan(const JTime& time) {
00354 int ret = 0;
00355 DataSample* sample;
00356 if (accessMutex.EnterMutex(1000)) {
00357
00358 while ( ( (sample = (DataSample*) dataSamples.getFirst()) != NULL ) &&
00359 (sample->timestamp - time < 0) ) {
00360 if (!deleteSample(sample))
00361 break;
00362 ret++;
00363 }
00364 }
00365 accessMutex.LeaveMutex();
00366 return (ret > 0);
00367 }
00368
00369 bool MediaStream::deleteSamplesNewerThan(const JTime& time) {
00370 int ret = 0;
00371 DataSample* sample;
00372 if (accessMutex.EnterMutex(1000)) {
00373
00374 while ( ( (sample = (DataSample*) dataSamples.getLast()) != NULL ) &&
00375 (sample->timestamp - time > 0) ) {
00376 if (!deleteSample(sample))
00377 break;
00378 ret++;
00379 }
00380 }
00381 accessMutex.LeaveMutex();
00382 return (ret > 0);
00383 }
00384
00385 bool MediaStream::deleteSamplesBetween(const JTime& fromTime, const JTime& toTime) {
00386 int ret = 0;
00387 DataSample* sample;
00388 if (accessMutex.EnterMutex(1000)) {
00389
00390 ObjectCollection* coll = dataSamples.getAllBetweenKeys(&fromTime, &toTime);
00391 if (coll != NULL) {
00392 while ( ( sample = (DataSample*) coll->getFirst()) != NULL ) {
00393 coll->removeNoDelete(0);
00394 if (!deleteSample(sample))
00395 break;
00396 ret++;
00397 }
00398 coll->removeAllNoDelete();
00399 delete(coll);
00400 }
00401 }
00402 accessMutex.LeaveMutex();
00403 return (ret > 0);
00404 }
00405
00406 long MediaStream::getTotalDataSize() {
00407
00408 return currentSize;
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423 }
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440 long MediaStream::getTotalDataMemUse() {
00441
00442 long size = 0;
00443 DataSample* sample;
00444
00445 if (accessMutex.EnterMutex(1000)) {
00446 for (int n=0; n<dataSamples.getCount(); n++) {
00447 sample = (DataSample*) dataSamples.get(n);
00448 if (sample != NULL) {
00449 size += sample->getDataMemUse();
00450 }
00451 }
00452 }
00453 accessMutex.LeaveMutex();
00454 return size;
00455 }
00456
00457 int MediaStream::getCount() {
00458 return dataSamples.getCount();
00459 }
00460
00461 ObjectCollection* MediaStream::getDataSampleList() {
00462 ObjectCollection* col = new ObjectCollection();
00463 DataSample* sample;
00464
00465 if (accessMutex.EnterMutex(1000)) {
00466 for (int n=0; n<dataSamples.getCount(); n++) {
00467 sample = (DataSample*) dataSamples.get(n);
00468 if (sample != NULL) {
00469 col->add(sample->cloneWithoutData());
00470 }
00471 }
00472 }
00473 accessMutex.LeaveMutex();
00474 return col;
00475 }
00476
00477 ObjectCollection* MediaStream::getTimestampList() {
00478 ObjectCollection* col = new ObjectCollection();
00479 JTime* t;
00480
00481 if (accessMutex.EnterMutex(1000)) {
00482 for (int n=0; n<dataSamples.getCount(); n++) {
00483 t = (JTime*) dataSamples.getKey(n);
00484 if (t != NULL) {
00485 col->add(t->clone());
00486 }
00487 }
00488 }
00489 accessMutex.LeaveMutex();
00490 return col;
00491 }
00492
00493 JTime MediaStream::getOldestSampleTime() {
00494 JTime *t = NULL;
00495
00496 if (accessMutex.EnterMutex(1000)) {
00497 t = (JTime*) dataSamples.getFirstKey();
00498 }
00499 accessMutex.LeaveMutex();
00500 if (t != NULL)
00501 return *t;
00502 else {
00503 JTime tt;
00504 tt.setInvalid();
00505 return tt;
00506 }
00507 }
00508
00509 JTime MediaStream::getNewestSampleTime() {
00510 JTime* t = NULL;
00511
00512 if (accessMutex.EnterMutex(1000)) {
00513 t = (JTime*) dataSamples.getLastKey();
00514 }
00515 accessMutex.LeaveMutex();
00516 if (t != NULL)
00517 return *t;
00518 else {
00519 JTime tt;
00520 tt.setInvalid();
00521 return tt;
00522 }
00523 }
00524
00525 DataSample* MediaStream::getOldestSample() {
00526 DataSample* sample = NULL;
00527 if (accessMutex.EnterMutex(1000)) {
00528 sample = (DataSample*) dataSamples.getFirst();
00529 if (sample != NULL) {
00530
00531 outputData.add(new JTime(), new JString(sample->getDataSize()));
00532
00533 while (outputData.getCount() > 10)
00534 outputData.removeFirst();
00535 }
00536 }
00537 accessMutex.LeaveMutex();
00538 return sample;
00539 }
00540
00541 DataSample* MediaStream::getNewestSample() {
00542 DataSample* sample = NULL;
00543
00544 if (accessMutex.EnterMutex(1000)) {
00545 sample = (DataSample*) dataSamples.getLast();
00546 if (sample != NULL) {
00547
00548 outputData.add(new JTime(), new JString(sample->getDataSize()));
00549
00550 while (outputData.getCount() > 10)
00551 outputData.removeFirst();
00552 }
00553 }
00554 accessMutex.LeaveMutex();
00555 return sample;
00556 }
00557
00558 DataSample* MediaStream::getSample(const JString& id) {
00559 DataSample* sample = NULL;
00560
00561 if (accessMutex.EnterMutex(1000)) {
00562 for (int n=getCount()-1; n>=0; n--) {
00563 sample = (DataSample*) dataSamples.get(n);
00564 if (sample != NULL) {
00565 if (sample->id.equals(id)) {
00566 break;
00567 }
00568 }
00569 }
00570 if (sample != NULL) {
00571 outputData.add(new JTime(), new JString(sample->getDataSize()));
00572
00573 while (outputData.getCount() > 10)
00574 outputData.removeFirst();
00575 }
00576 }
00577 accessMutex.LeaveMutex();
00578 return sample;
00579 }
00580
00581 DataSample* MediaStream::getSampleAt(const JTime& time) {
00582 DataSample* sample = NULL;
00583
00584 if (accessMutex.EnterMutex(1000)) {
00585
00586
00587 sample = (DataSample*) dataSamples.get(&time);
00588 if (sample != NULL) {
00589
00590 outputData.add(new JTime(), new JString(sample->getDataSize()));
00591
00592 while (outputData.getCount() > 10)
00593 outputData.removeFirst();
00594 }
00595 }
00596 accessMutex.LeaveMutex();
00597 return sample;
00598 }
00599
00600 DataSample* MediaStream::getFirstSampleAfter(const JString& id) {
00601 DataSample* sample = NULL;
00602
00603 if (accessMutex.EnterMutex(1000)) {
00604 for (int n=getCount()-1; n>=0; n--) {
00605 sample = (DataSample*) dataSamples.get(n);
00606 if (sample != NULL) {
00607 if (sample->id.equals(id)) {
00608 sample = (DataSample*) dataSamples.getNext();
00609 break;
00610 }
00611 }
00612 }
00613 if (sample != NULL) {
00614 outputData.add(new JTime(), new JString(sample->getDataSize()));
00615
00616 while (outputData.getCount() > 10)
00617 outputData.removeFirst();
00618 }
00619 }
00620 accessMutex.LeaveMutex();
00621 return sample;
00622 }
00623
00624 DataSample* MediaStream::getFirstSampleAfter(const JTime& time) {
00625 DataSample* sample = NULL;
00626
00627 if (accessMutex.EnterMutex(1000)) {
00628 sample = (DataSample*) dataSamples.getFirstAfterKey(&time);
00629 if (sample != NULL) {
00630
00631 outputData.add(new JTime(), new JString(sample->getDataSize()));
00632
00633 while (outputData.getCount() > 10)
00634 outputData.removeFirst();
00635 }
00636 }
00637 accessMutex.LeaveMutex();
00638 return sample;
00639 }
00640
00641 DataSample* MediaStream::getLastSampleBefore(const JTime& time) {
00642 DataSample* sample = NULL;
00643
00644 if (accessMutex.EnterMutex(1000)) {
00645 sample = (DataSample*) dataSamples.getLastBeforeKey(&time);
00646 if (sample != NULL) {
00647
00648 outputData.add(new JTime(), new JString(sample->getDataSize()));
00649
00650 while (outputData.getCount() > 10)
00651 outputData.removeFirst();
00652 }
00653 }
00654 accessMutex.LeaveMutex();
00655 return sample;
00656 }
00657
00658 ObjectCollection* MediaStream::getSamplesBetween(const JTime& fromTime, const JTime& toTime) {
00659 ObjectCollection* samples = NULL;
00660
00661 if (accessMutex.EnterMutex(1000)) {
00662 samples = dataSamples.getAllBetweenKeys(&fromTime, &toTime);
00663 if (samples != NULL) {
00664 samples->noDelete();
00665 long size = 0;
00666 for (DataSample* sample = (DataSample*) samples->getFirst(); sample != NULL; sample = (DataSample*) samples->getNext()) {
00667 size += sample->getDataSize();
00668 }
00669 outputData.add(new JTime(), new JString(size));
00670 while (outputData.getCount() > 10)
00671 outputData.removeFirst();
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 }
00683 }
00684 accessMutex.LeaveMutex();
00685 return samples;
00686 }
00687
00688
00689
00690
00691 DataSample* MediaStream::waitForFirstSampleAfter(const JTime& time, long ms) {
00692
00693 DataSample* sample = getFirstSampleAfter(time);
00694 if (sample != NULL)
00695 return sample;
00696
00697 JTime start;
00698
00699 long dif = start.getAge();
00700
00701 while ( dif < ms ) {
00702 newDataSem.wait(ms - dif);
00703
00704 if ( (sample = getFirstSampleAfter(time)) != NULL)
00705 return sample;
00706
00707 dif = start.getAge();
00708 }
00709 return NULL;
00710 }
00711
00712 DataSample* MediaStream::waitForLastSampleAfter(const JTime& time, long ms) {
00713 JTime newest = getNewestSampleTime();
00714
00715 if ( (newest.isValid()) && (newest.compare(&time) > 0) )
00716 return this->getSampleAt(newest);
00717
00718
00719
00720
00721
00722
00723
00724 JTime start;
00725
00726 long dif = start.getAge();
00727
00728
00729 while ( dif < ms ) {
00730
00731 newDataSem.wait(ms - dif);
00732
00733
00734
00735 newest = getNewestSampleTime();
00736
00737 if ( (newest.isValid()) && (newest.compare(&time) > 0) ) {
00738 return this->getSampleAt(newest);
00739 }
00740
00741 dif = start.getAge();
00742 }
00743 return NULL;
00744 }
00745
00746 DataSample* MediaStream::waitForFirstSampleAfter(const JString& id, long ms) {
00747 DataSample* sample = getFirstSampleAfter(id);
00748 if (sample != NULL)
00749 return sample;
00750
00751 JTime start;
00752
00753 long dif = start.getAge();
00754
00755 while ( dif < ms ) {
00756 newDataSem.wait(ms - dif);
00757 if ( (sample = getFirstSampleAfter(id)) != NULL)
00758 return sample;
00759
00760 dif = start.getAge();
00761 }
00762 return NULL;
00763 }
00764
00765 DataSample* MediaStream::waitForLastSampleAfter(const JString& id, long ms) {
00766 DataSample* sample = getNewestSample();
00767 if ((sample != NULL) && (!sample->id.equals(id)))
00768 return sample;
00769
00770 JTime start;
00771
00772 long dif = start.getAge();
00773
00774 while ( dif < ms ) {
00775 newDataSem.wait(ms - dif);
00776 sample = getNewestSample();
00777 if ((sample != NULL) && (!sample->id.equals(id)))
00778 return sample;
00779
00780 dif = start.getAge();
00781 }
00782 return NULL;
00783 }
00784
00785
00786
00787 bool MediaStream::doMaintenance() {
00788
00789 int maximumCount = maxCount;
00790 if (maxCount <= 0)
00791 maximumCount = STREAMMAXCOUNTDEFAULT;
00792
00793 DataSample* sample;
00794 if (this->getCount() > maximumCount) {
00795 if (accessMutex.EnterMutex(1000)) {
00796 while (this->getCount() > maximumCount) {
00797 if (!deleteSample(0)) {
00798 accessMutex.LeaveMutex();
00799 return true;
00800 }
00801
00802
00803
00804
00805
00806
00807
00808
00809 }
00810 accessMutex.LeaveMutex();
00811 }
00812 }
00813
00814 while (deletedSamples.getCount() > maxBackupCount)
00815 deletedSamples.removeFirst();
00816
00817 if (currentSize < softMax)
00818 return true;
00819
00820 if (accessMutex.EnterMutex(1000)) {
00821
00822 while ((hardMax > 0) && (currentSize > hardMax)) {
00823 if (!deleteSample(0)) {
00824 accessMutex.LeaveMutex();
00825 return true;
00826 }
00827
00828
00829
00830
00831
00832 }
00833
00834 while ((softMax > 0) && (currentSize > softMax)) {
00835 if ( (sample = (DataSample*) dataSamples.getFirst()) != NULL) {
00836 if (sample->isInUse()) {
00837 sample->lockBy(name);
00838 break;
00839 }
00840 else {
00841 if (!deleteSample(0)) {
00842 accessMutex.LeaveMutex();
00843 return true;
00844 }
00845
00846
00847
00848
00849 }
00850 }
00851 }
00852 }
00853 accessMutex.LeaveMutex();
00854
00855 return true;
00856 }
00857
00858
00859
00860 bool MediaStream::createChannel(const JString& name, const JString& fieldname) {
00861 if (channels.contains(name))
00862 return false;
00863 channelNames.put(name, fieldname);
00864 SortedObjectCollection* channel = new SortedObjectCollection();
00865 channels.put(name, channel);
00866 return true;
00867 }
00868
00869 bool MediaStream::destroyChannel(const JString& name) {
00870 channelNames.remove(name);
00871 SortedObjectCollection* channel = (SortedObjectCollection*) channels.get(name);
00872 channels.removeNoDelete(name);
00873 delete(channel);
00874 return true;
00875 }
00876
00877 ObjectCollection* MediaStream::searchChannel(const JString& name, double val1, double val2) {
00878 SortedObjectCollection* channel = (SortedObjectCollection*) channels.get(name);
00879 if (channel == NULL)
00880 return NULL;
00881 SortedEntry e1 = SortedEntry(val1, NULL, NULL);
00882 SortedEntry e2 = SortedEntry(val2, NULL, NULL);
00883 ObjectCollection* samples = channel->objectTable.getAllBetween(&e1, &e2);
00884 if (samples != NULL) {
00885 ObjectCollection* coll = new ObjectCollection();
00886 coll->noDelete();
00887 long size = 0;
00888 for (SortedEntry* entry = (SortedEntry*) samples->getFirst(); entry != NULL; entry = (SortedEntry*) samples->getNext()) {
00889 coll->addLast(entry->object);
00890 size += ((DataSample*)entry->object)->getDataSize();
00891 }
00892 outputData.add(new JTime(), new JString(size));
00893 while (outputData.getCount() > 10)
00894 outputData.removeFirst();
00895 delete(samples);
00896 return coll;
00897 }
00898 return NULL;
00899 }
00900
00901 bool MediaStream::addChannelSample(DataSample* sample) {
00902 if (sample == NULL)
00903 return false;
00904 JString fieldname;
00905 SortedObjectCollection* channel;
00906 SortedEntry* entry;
00907 for (int n=0; n<channelNames.getCount(); n++) {
00908 name = channelNames.getKey(n);
00909 if ( (channel = (SortedObjectCollection*) channels.get(name)) != NULL) {
00910 fieldname = channelNames.get(n);
00911 if (sample->params.contains(fieldname)) {
00912 entry = new SortedEntry(sample->getParamFloat(fieldname), sample, &channel->objectTable);
00913 channel->add(entry);
00914 if (sample->channels == NULL)
00915 sample->channels = new ObjectDictionary;
00916 sample->channels->put(name, entry);
00917
00918 }
00919 }
00920 }
00921 return true;
00922 }
00923
00924 bool MediaStream::removeChannelSample(DataSample* sample) {
00925 if (sample == NULL)
00926 return true;;
00927 SortedEntry* entry;
00928 while ( (entry = (SortedEntry*) sample->channels->getFirst()) != NULL) {
00929
00930 if (entry->table != NULL)
00931 entry->table->remove(entry);
00932 sample->channels->removeFirst();
00933 }
00934 return true;
00935 }
00936
00937
00938
00939
00940
00941
00942
00943
00944
00945 }