00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "ObjectQueue.h"
00026
00027 namespace cmlabs {
00028
00029
00030
00031
00032
00033 ObjectQueue::ObjectQueue()
00034 {
00035 dict = new ObjectDictionary();
00036
00037 }
00038
00039 ObjectQueue::~ObjectQueue()
00040 {
00041 delete(dict);
00042 }
00043
00044
00045 unsigned long ObjectQueue::getPayloadSize() const {
00046 return dict->getPayloadSize();
00047 }
00048
00049 bool ObjectQueue::isEmpty() {
00050 bool result;
00051 if (mutex.EnterMutex(1000)) {
00052 result = (dict->getCount() == 0);
00053 mutex.LeaveMutex();
00054 return result;
00055 }
00056 else
00057 return true;
00058 }
00059
00060 int ObjectQueue::getCount() {
00061 int result;
00062 if (!mutex.EnterMutex(1000))
00063 return 0;
00064 result = dict->getCount();
00065 mutex.LeaveMutex();
00066 return result;
00067 }
00068
00069 bool ObjectQueue::removeAll() {
00070 Object* obj;
00071 while ( (obj = this->retrieveEntry(0)) != NULL)
00072 delete(obj);
00073 return true;
00074 }
00075
00076 ObjectCollection* ObjectQueue::retrieveAll() {
00077 if (!mutex.EnterMutex(1000))
00078 return NULL;
00079
00080 if (dict->getCount() == 0) {
00081 mutex.LeaveMutex();
00082 return NULL;
00083 }
00084
00085 ObjectCollection* col = new ObjectCollection();
00086 col->addAll(dict);
00087 dict->removeAllNoDelete();
00088 mutex.LeaveMutex();
00089 return col;
00090 }
00091
00092
00093
00094 JString ObjectQueue::add(Object* obj) {
00095 JString id = createUniqueID("queue");
00096 if (!mutex.EnterMutex(1000))
00097 return "";
00098 dict->put(id, obj);
00099 mutex.LeaveMutex();
00100 sem.post();
00101 return id;
00102 }
00103
00104 JString ObjectQueue::add(JString id, Object* obj) {
00105 if (!mutex.EnterMutex(1000))
00106 return "";
00107 dict->put(id, obj);
00108 mutex.LeaveMutex();
00109 sem.post();
00110 return id;
00111 }
00112
00113
00114
00115 JString ObjectQueue::waitForNewEntryID(long ms) {
00116 JString id;
00117
00118 if (!mutex.EnterMutex(ms))
00119 return "";
00120
00121 if (dict->getCount() > 0) {
00122 sem.tryWait();
00123 id = dict->getFirstKey();
00124 mutex.LeaveMutex();
00125 }
00126 else if (ms > 0) {
00127 mutex.LeaveMutex();
00128 if (sem.wait(ms)) {
00129 if (!mutex.EnterMutex(1000))
00130 return "";
00131 id = dict->getFirstKey();
00132 mutex.LeaveMutex();
00133 }
00134 }
00135 else {
00136 mutex.LeaveMutex();
00137 }
00138
00139 return id;
00140 }
00141
00142 Object* ObjectQueue::waitForNewEntry(long ms) {
00143
00144 if (!mutex.EnterMutex(ms))
00145 return NULL;
00146
00147 Object* obj = NULL;
00148 if (dict->getCount() > 0) {
00149 sem.tryWait();
00150 obj = dict->get(0);
00151 dict->removeNoDelete(0);
00152 mutex.LeaveMutex();
00153 }
00154 else if (ms > 0) {
00155 mutex.LeaveMutex();
00156 if (sem.wait(ms)) {
00157 if (!mutex.EnterMutex(1000))
00158 return NULL;
00159 obj = dict->get(0);
00160 dict->removeNoDelete(0);
00161 mutex.LeaveMutex();
00162 }
00163 }
00164 else {
00165 mutex.LeaveMutex();
00166 }
00167
00168 return obj;
00169 }
00170
00171 bool ObjectQueue::waitForNewEntryToAppear(long ms) {
00172
00173 if (!mutex.EnterMutex(ms))
00174 return false;
00175
00176 Object* obj = NULL;
00177 if (dict->getCount() > 0) {
00178 sem.tryWait();
00179 obj = dict->get(0);
00180
00181 mutex.LeaveMutex();
00182 }
00183 else if (ms > 0) {
00184 mutex.LeaveMutex();
00185 if (sem.wait(ms)) {
00186 if (!mutex.EnterMutex(ms))
00187 return false;
00188 obj = dict->get(0);
00189
00190 mutex.LeaveMutex();
00191 }
00192 }
00193 else {
00194 mutex.LeaveMutex();
00195 }
00196
00197 return (obj != NULL);
00198 }
00199
00200 Object* ObjectQueue::waitForNewEntry(JString id, long ms) {
00201
00202 Object* entry;
00203 long waitedSoFar = 0;
00204 bool done = false;
00205
00206
00207 while (! done) {
00208 if (!mutex.EnterMutex(ms))
00209 return NULL;
00210 done = ! ((entry = dict->get(id)) == NULL);
00211 mutex.LeaveMutex();
00212
00213 if (! done) {
00214 if (!sem.wait(ms - waitedSoFar))
00215 waitedSoFar += (ms - waitedSoFar);
00216
00217 if (waitedSoFar >= ms) {
00218 break;
00219 }
00220 }
00221 }
00222
00223 if (done) {
00224 if (!mutex.EnterMutex(ms))
00225 return NULL;
00226 dict->removeNoDelete(id);
00227 mutex.LeaveMutex();
00228 }
00229
00230 return entry;
00231 }
00232
00233
00234 Object* ObjectQueue::retrieveEntry(JString id) {
00235 if (!mutex.EnterMutex(1000))
00236 return NULL;
00237 Object* obj = dict->get(id);
00238 dict->removeNoDelete(id);
00239 mutex.LeaveMutex();
00240 return obj;
00241 }
00242
00243 Object* ObjectQueue::retrieveEntry(int pos) {
00244 if (!mutex.EnterMutex(1000))
00245 return NULL;
00246 Object* obj = dict->get(pos);
00247 dict->removeNoDelete(pos);
00248 mutex.LeaveMutex();
00249 return obj;
00250 }
00251
00252 Object* ObjectQueue::viewEntry(JString id) {
00253 return dict->get(id);
00254 }
00255
00256 Object* ObjectQueue::viewEntry(int pos) {
00257 return dict->get(pos);
00258 }
00259
00260
00261
00262
00263
00264
00265
00266 long ObjectQueue::getBinarySize(int chunk) {
00267 if (dict == NULL)
00268 return 0;
00269 return dict->getBinarySize(chunk);
00270 }
00271
00272
00273 int ObjectQueue::getBinaryChunkCount() {
00274 if (dict == NULL)
00275 return 0;
00276 return dict->getBinaryChunkCount();
00277 }
00278
00279
00280 long ObjectQueue::toBinaryBuffer(int chunk, char* buffer, int maxlen) {
00281 if (dict == NULL)
00282 return false;
00283 return dict->toBinaryBuffer(chunk, buffer, maxlen);
00284 }
00285
00286
00287 bool ObjectQueue::fromBinaryBuffer(int chunk, char* buffer, long len) {
00288 if (dict == NULL)
00289 return false;
00290 return dict->fromBinaryBuffer(chunk, buffer, len);
00291 }
00292
00293
00294 bool ObjectQueue::unitTest() {
00295 Queue testQ;
00296 return (testQ.unitTest());
00297 }
00298
00299
00300
00301
00302
00303 }