00001 /***************************** License ********************************** 00002 00003 Copyright (C) 2008 by Communicative Machines 00004 http://www.cmlabs.com All rights reserved. 00005 00006 This library is free software; you can redistribute it and/or 00007 modify it under the terms of the GNU Lesser General Public 00008 License as published by the Free Software Foundation; either 00009 version 2.1 of the License, or (at your option) any later version. 00010 00011 This library is distributed in the hope that it will be useful, 00012 but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00014 Lesser General Public License for more details. 00015 00016 You should have received a copy of the GNU Lesser General Public 00017 License along with this library; if not, write to the Free Software 00018 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00019 00020 ************************************************************************/ 00021 // Queue.cpp: implementation of the Queue class. 00022 // 00023 ////////////////////////////////////////////////////////////////////// 00024 00025 #include "Queue.h" 00026 00027 namespace cmlabs { 00028 00029 ////////////////////////////////////////////////////////////////////// 00030 // Construction/Destruction 00031 ////////////////////////////////////////////////////////////////////// 00032 00033 Queue::Queue() 00034 { 00035 dict = new Dictionary(); 00036 } 00037 00038 Queue::~Queue() 00039 { 00040 delete(dict); 00041 } 00042 00043 //! Get total size of payload 00044 unsigned long Queue::getPayloadSize() const { 00045 return dict->getPayloadSize(); 00046 } 00047 00048 bool Queue::isEmpty() { 00049 bool result; 00050 if (!mutex.EnterMutex(1000)) 00051 return true; 00052 result = (dict->getCount() == 0); 00053 mutex.LeaveMutex(); 00054 return result; 00055 } 00056 00057 int Queue::getCount() { 00058 int result; 00059 if (!mutex.EnterMutex(1000)) 00060 return 0; 00061 result = dict->getCount(); 00062 mutex.LeaveMutex(); 00063 return result; 00064 } 00065 00066 bool Queue::removeAll() { 00067 return dict->removeAll(); 00068 } 00069 00070 00071 JString Queue::add(JString str) { 00072 JString id = createUniqueID("queue"); 00073 if (!mutex.EnterMutex(1000)) 00074 return ""; 00075 dict->put(id, str); 00076 mutex.LeaveMutex(); 00077 sem.post(); 00078 return id; 00079 } 00080 00081 JString Queue::add(JString id, JString str) { 00082 if (!mutex.EnterMutex(1000)) 00083 return ""; 00084 dict->put(id, str); 00085 mutex.LeaveMutex(); 00086 sem.post(); 00087 return id; 00088 } 00089 00090 00091 JString Queue::waitForNewEntry(long ms) { 00092 00093 if (!mutex.EnterMutex(1000)) 00094 return ""; 00095 00096 JString entry; 00097 if (dict->getCount() > 0) { 00098 sem.tryWait(); 00099 entry = dict->get(0); 00100 dict->remove(0); 00101 mutex.LeaveMutex(); 00102 } 00103 else if (ms > 0) { 00104 mutex.LeaveMutex(); 00105 if (sem.wait(ms)) { 00106 if (!mutex.EnterMutex(1000)) { 00107 printf(" NO GOT MUTEX!\n"); 00108 return ""; 00109 } 00110 entry = dict->get(0); 00111 dict->remove(0); 00112 mutex.LeaveMutex(); 00113 } 00114 } 00115 else { 00116 mutex.LeaveMutex(); 00117 } 00118 00119 return entry; 00120 } 00121 00122 00123 JString Queue::waitForNewEntryID(long ms) { 00124 JString id; 00125 bool res; 00126 00127 if (!mutex.EnterMutex(1000)) 00128 return ""; 00129 00130 if (dict->getCount() > 0) { 00131 sem.tryWait(); 00132 id = dict->getFirstKey(); 00133 mutex.LeaveMutex(); 00134 } 00135 else if (ms > 0) { 00136 mutex.LeaveMutex(); 00137 res = sem.wait(ms); 00138 if (res) { 00139 if (!mutex.EnterMutex(1000)) 00140 return ""; 00141 id = dict->getFirstKey(); 00142 mutex.LeaveMutex(); 00143 } 00144 } 00145 else { 00146 mutex.LeaveMutex(); 00147 } 00148 00149 return id; 00150 } 00151 00152 00153 JString Queue::waitForNewEntry(JString id, long ms) { 00154 00155 JString entry; 00156 00157 long waitedSoFar = 0; 00158 00159 bool done = false; 00160 00161 // Check all already in there, only wait if not 00162 while (! done) { 00163 if (!mutex.EnterMutex(1000)) { 00164 printf(" NO GOT MUTEX!\n"); 00165 return ""; 00166 } 00167 done = ! ((entry = dict->get(id)).length() == 0); 00168 mutex.LeaveMutex(); 00169 00170 if (! done) { 00171 if (!sem.wait(ms - waitedSoFar)) 00172 waitedSoFar += (ms - waitedSoFar); 00173 00174 if (waitedSoFar >= ms) { 00175 break; 00176 } 00177 } 00178 } 00179 00180 /* mutex.EnterMutex(); 00181 00182 // Check all already in there, only wait if not 00183 while ((entry = dict->get(id)).length() == 0) { 00184 if (!sem.wait(ms - waitedSoFar)) 00185 waitedSoFar += (ms - waitedSoFar); 00186 00187 if (waitedSoFar >= ms) { 00188 break; 00189 } 00190 } 00191 00192 mutex.LeaveMutex();*/ 00193 00194 // printf("waitForNewEntry took %d ms\n", JTime() - t); 00195 00196 if (done) { 00197 if (!mutex.EnterMutex(1000)) { 00198 printf(" NO GOT MUTEX!\n"); 00199 return ""; 00200 } 00201 // printf("--- Left in API: %d\n", dict->getCount()); 00202 dict->remove(id); 00203 // printf("*** Left in API: %d\n", dict->getCount()); 00204 mutex.LeaveMutex(); 00205 } 00206 00207 return entry; 00208 } 00209 /* 00210 JString Queue::retrieveEntry(JString id) { 00211 00212 if (id.length() == 0) return ""; 00213 00214 JString value; 00215 00216 if (!mutex.EnterMutex(1000)) 00217 return ""; 00218 00219 value = dict->get(id); 00220 dict->remove(id); 00221 00222 mutex.LeaveMutex(); 00223 00224 return value; 00225 } 00226 */ 00227 JString Queue::retrieveEntry(JString id) { 00228 if (!mutex.EnterMutex(1000)) 00229 return ""; 00230 JString str = dict->get(id); 00231 dict->remove(id); 00232 mutex.LeaveMutex(); 00233 return str; 00234 } 00235 00236 JString Queue::retrieveEntry(int pos) { 00237 if (!mutex.EnterMutex(1000)) 00238 return ""; 00239 JString str = dict->get(pos); 00240 dict->remove(pos); 00241 mutex.LeaveMutex(); 00242 return str; 00243 } 00244 00245 JString Queue::viewEntry(JString id) { 00246 return dict->get(id); 00247 } 00248 00249 JString Queue::viewEntry(int pos) { 00250 return dict->get(pos); 00251 } 00252 00253 bool Queue::unitTest() { 00254 00255 Queue* q = new Queue(); 00256 00257 // We have added none, this should return "" 00258 if (q->waitForNewEntryID(500).length() > 0) 00259 return false; 00260 00261 JString id = q->add("Test"); 00262 00263 // Now id should be in there... 00264 if (!id.equals(q->waitForNewEntryID(500))) 00265 return false; 00266 00267 if (!q->retrieveEntry(id).equals("Test")) 00268 return false; 00269 00270 delete(q); 00271 00272 return true; 00273 } 00274 00275 00276 00277 00278 00279 } // namespace cmlabs