Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include "vm/assert.h" | 5 #include "vm/assert.h" |
| 6 #include "vm/message_queue.h" | 6 #include "vm/message_queue.h" |
| 7 #include "vm/unit_test.h" | 7 #include "vm/unit_test.h" |
| 8 | 8 |
| 9 namespace dart { | 9 namespace dart { |
| 10 | 10 |
| 11 | 11 |
| 12 // Provide access to private members of MessageQueue for testing. | 12 // Provide access to private members of MessageQueue for testing. |
| 13 class MessageQueueTestPeer { | 13 class MessageQueueTestPeer { |
| 14 public: | 14 public: |
| 15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} | 15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} |
| 16 | 16 |
| 17 bool HasMessage() const { return queue_->head_ != NULL; } | 17 bool HasMessage() const { |
|
Anton Muhin
2012/01/12 12:58:16
that's a pure paranoia, but technically you're not
turnidge
2012/01/12 19:01:55
Added lock.
| |
| 18 return (queue_->head_[Message::kNormalPriority] != NULL || | |
| 19 queue_->head_[Message::kOOBPriority] != NULL); } | |
| 18 | 20 |
| 19 private: | 21 private: |
| 20 MessageQueue* queue_; | 22 MessageQueue* queue_; |
| 21 }; | 23 }; |
| 22 | 24 |
| 23 | 25 |
| 24 static Dart_Message AllocMsg(const char* str) { | 26 static uint8_t* AllocMsg(const char* str) { |
| 25 return reinterpret_cast<Dart_Message>(strdup(str)); | 27 return reinterpret_cast<uint8_t*>(strdup(str)); |
| 26 } | 28 } |
| 27 | 29 |
| 28 | 30 |
| 29 TEST_CASE(MessageQueue_BasicOperations) { | 31 TEST_CASE(MessageQueue_BasicOperations) { |
| 30 MessageQueue queue; | 32 MessageQueue queue; |
| 31 MessageQueueTestPeer queue_peer(&queue); | 33 MessageQueueTestPeer queue_peer(&queue); |
| 32 EXPECT(!queue_peer.HasMessage()); | 34 EXPECT(!queue_peer.HasMessage()); |
| 33 | 35 |
| 34 Dart_Port port = 1; | 36 Dart_Port port = 1; |
| 35 | 37 |
| 36 // Add two messages. | 38 // Add two messages. |
| 37 PortMessage* msg1 = new PortMessage(port, 0, AllocMsg("msg1")); | 39 Message* msg1 = |
| 40 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
| 38 queue.Enqueue(msg1); | 41 queue.Enqueue(msg1); |
| 39 EXPECT(queue_peer.HasMessage()); | 42 EXPECT(queue_peer.HasMessage()); |
| 40 | 43 |
| 41 PortMessage* msg2 = new PortMessage(port, 0, AllocMsg("msg2")); | 44 Message* msg2 = |
| 45 new Message(port, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
| 46 | |
| 42 queue.Enqueue(msg2); | 47 queue.Enqueue(msg2); |
| 43 EXPECT(queue_peer.HasMessage()); | 48 EXPECT(queue_peer.HasMessage()); |
| 44 | 49 |
| 45 // Remove two messages. | 50 // Remove two messages. |
| 46 PortMessage* msg = queue.Dequeue(0); | 51 Message* msg = queue.Dequeue(0); |
| 47 EXPECT(msg != NULL); | 52 EXPECT(msg != NULL); |
| 48 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); | 53 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); |
| 49 EXPECT(queue_peer.HasMessage()); | 54 EXPECT(queue_peer.HasMessage()); |
| 50 | 55 |
| 51 msg = queue.Dequeue(0); | 56 msg = queue.Dequeue(0); |
| 52 EXPECT(msg != NULL); | 57 EXPECT(msg != NULL); |
| 53 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | 58 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); |
| 54 EXPECT(!queue_peer.HasMessage()); | 59 EXPECT(!queue_peer.HasMessage()); |
| 55 | 60 |
| 56 delete msg1; | 61 delete msg1; |
| 57 delete msg2; | 62 delete msg2; |
| 58 } | 63 } |
| 59 | 64 |
| 60 | 65 |
| 66 TEST_CASE(MessageQueue_Priorities) { | |
| 67 MessageQueue queue; | |
| 68 MessageQueueTestPeer queue_peer(&queue); | |
| 69 EXPECT(!queue_peer.HasMessage()); | |
| 70 | |
| 71 Dart_Port port = 1; | |
| 72 | |
| 73 // Add two messages. | |
| 74 Message* msg1 = | |
| 75 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
| 76 queue.Enqueue(msg1); | |
| 77 EXPECT(queue_peer.HasMessage()); | |
| 78 | |
| 79 Message* msg2 = | |
| 80 new Message(port, 0, AllocMsg("msg2"), Message::kOOBPriority); | |
| 81 | |
| 82 queue.Enqueue(msg2); | |
| 83 EXPECT(queue_peer.HasMessage()); | |
| 84 | |
| 85 // The higher priority message is delivered first. | |
| 86 Message* msg = queue.Dequeue(0); | |
| 87 EXPECT(msg != NULL); | |
| 88 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | |
| 89 EXPECT(queue_peer.HasMessage()); | |
| 90 | |
| 91 msg = queue.Dequeue(0); | |
| 92 EXPECT(msg != NULL); | |
| 93 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); | |
| 94 EXPECT(!queue_peer.HasMessage()); | |
| 95 | |
| 96 delete msg1; | |
| 97 delete msg2; | |
| 98 } | |
| 99 | |
| 100 | |
| 61 // A thread which receives an expected sequence of messages. | 101 // A thread which receives an expected sequence of messages. |
| 62 static Monitor* sync = NULL; | 102 static Monitor* sync = NULL; |
| 63 static MessageQueue* shared_queue = NULL; | 103 static MessageQueue* shared_queue = NULL; |
| 64 void MessageReceiver_start(uword unused) { | 104 void MessageReceiver_start(uword unused) { |
| 65 // We only need an isolate here because the MonitorLocker in the | 105 // We only need an isolate here because the MonitorLocker in the |
| 66 // MessageQueue expects it, we don't need to initialize the isolate | 106 // MessageQueue expects it, we don't need to initialize the isolate |
| 67 // as it does not run any dart code. | 107 // as it does not run any dart code. |
| 68 Dart::CreateIsolate(); | 108 Dart::CreateIsolate(); |
| 69 | 109 |
| 70 // Create a message queue and share it. | 110 // Create a message queue and share it. |
| 71 MessageQueue* queue = new MessageQueue(); | 111 MessageQueue* queue = new MessageQueue(); |
| 72 MessageQueueTestPeer peer(queue); | 112 MessageQueueTestPeer peer(queue); |
| 73 shared_queue = queue; | 113 shared_queue = queue; |
| 74 | 114 |
| 75 // Tell the other thread that the shared queue is ready. | 115 // Tell the other thread that the shared queue is ready. |
| 76 { | 116 { |
| 77 MonitorLocker ml(sync); | 117 MonitorLocker ml(sync); |
| 78 ml.Notify(); | 118 ml.Notify(); |
| 79 } | 119 } |
| 80 | 120 |
| 81 // Wait for the other thread to fill the queue a bit. | 121 // Wait for the other thread to fill the queue a bit. |
| 82 while (!peer.HasMessage()) { | 122 while (!peer.HasMessage()) { |
| 83 MonitorLocker ml(sync); | 123 MonitorLocker ml(sync); |
| 84 ml.Wait(5); | 124 ml.Wait(5); |
| 85 } | 125 } |
| 86 | 126 |
| 87 for (int i = 0; i < 3; i++) { | 127 for (int i = 0; i < 3; i++) { |
| 88 PortMessage* msg = queue->Dequeue(0); | 128 Message* msg = queue->Dequeue(0); |
| 89 EXPECT(msg != NULL); | 129 EXPECT(msg != NULL); |
| 90 EXPECT_EQ(i+10, msg->dest_port()); | 130 EXPECT_EQ(i+10, msg->dest_port()); |
| 91 EXPECT_EQ(i+100, msg->reply_port()); | 131 EXPECT_EQ(i+100, msg->reply_port()); |
| 92 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data()))); | 132 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data()))); |
| 93 delete msg; | 133 delete msg; |
| 94 } | 134 } |
| 95 for (int i = 0; i < 3; i++) { | 135 for (int i = 0; i < 3; i++) { |
| 96 PortMessage* msg = queue->Dequeue(0); | 136 Message* msg = queue->Dequeue(0); |
| 97 EXPECT(msg != NULL); | 137 EXPECT(msg != NULL); |
| 98 EXPECT_EQ(i+20, msg->dest_port()); | 138 EXPECT_EQ(i+20, msg->dest_port()); |
| 99 EXPECT_EQ(i+200, msg->reply_port()); | 139 EXPECT_EQ(i+200, msg->reply_port()); |
| 100 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data()))); | 140 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data()))); |
| 101 delete msg; | 141 delete msg; |
| 102 } | 142 } |
| 103 shared_queue = NULL; | 143 shared_queue = NULL; |
| 104 delete queue; | 144 delete queue; |
| 105 Dart::ShutdownIsolate(); | 145 Dart::ShutdownIsolate(); |
| 106 } | 146 } |
| 107 | 147 |
| 108 | 148 |
| 109 TEST_CASE(MessageQueue_WaitNotify) { | 149 TEST_CASE(MessageQueue_WaitNotify) { |
| 110 sync = new Monitor(); | 150 sync = new Monitor(); |
| 111 | 151 |
| 112 Thread* thread = new Thread(MessageReceiver_start, 0); | 152 Thread* thread = new Thread(MessageReceiver_start, 0); |
| 113 EXPECT(thread != NULL); | 153 EXPECT(thread != NULL); |
| 114 | 154 |
| 115 // Wait for the shared queue to be created. | 155 // Wait for the shared queue to be created. |
| 116 while (shared_queue == NULL) { | 156 while (shared_queue == NULL) { |
| 117 MonitorLocker ml(sync); | 157 MonitorLocker ml(sync); |
| 118 ml.Wait(5); | 158 ml.Wait(5); |
| 119 } | 159 } |
| 120 ASSERT(shared_queue != NULL); | 160 ASSERT(shared_queue != NULL); |
| 121 | 161 |
| 122 // Pile up three messages before the other thread runs. | 162 // Pile up three messages before the other thread runs. |
| 123 for (int i = 0; i < 3; i++) { | 163 for (int i = 0; i < 3; i++) { |
| 124 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | 164 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); |
| 125 *data = i+1000; | 165 *data = i+1000; |
| 126 PortMessage* msg = | 166 Message* msg = new Message(i+10, i+100, reinterpret_cast<uint8_t*>(data), |
|
Anton Muhin
2012/01/12 12:58:16
nit: spaces around +'s
turnidge
2012/01/12 19:01:55
Done.
| |
| 127 new PortMessage(i+10, i+100, reinterpret_cast<Dart_Message>(data)); | 167 Message::kNormalPriority); |
| 128 shared_queue->Enqueue(msg); | 168 shared_queue->Enqueue(msg); |
| 129 } | 169 } |
| 130 | 170 |
| 131 // Wake the other thread and have it start consuming messages. | 171 // Wake the other thread and have it start consuming messages. |
| 132 { | 172 { |
| 133 MonitorLocker ml(sync); | 173 MonitorLocker ml(sync); |
| 134 ml.Notify(); | 174 ml.Notify(); |
| 135 } | 175 } |
| 136 | 176 |
| 137 // Add a few more messages after sleeping to allow the other thread | 177 // Add a few more messages after sleeping to allow the other thread |
| 138 // to potentially exercise the blocking code path in Dequeue. | 178 // to potentially exercise the blocking code path in Dequeue. |
| 139 OS::Sleep(5); | 179 OS::Sleep(5); |
| 140 for (int i = 0; i < 3; i++) { | 180 for (int i = 0; i < 3; i++) { |
| 141 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | 181 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); |
| 142 *data = i+2000; | 182 *data = i+2000; |
| 143 PortMessage* msg = | 183 Message* msg = new Message(i+20, i+200, reinterpret_cast<uint8_t*>(data), |
| 144 new PortMessage(i+20, i+200, reinterpret_cast<Dart_Message>(data)); | 184 Message::kNormalPriority); |
| 145 shared_queue->Enqueue(msg); | 185 shared_queue->Enqueue(msg); |
| 146 } | 186 } |
| 147 | 187 |
| 148 sync = NULL; | 188 sync = NULL; |
| 149 delete sync; | 189 delete sync; |
| 150 | 190 |
| 151 // Give the spawned thread enough time to properly exit. | 191 // Give the spawned thread enough time to properly exit. |
| 152 OS::Sleep(20); | 192 OS::Sleep(20); |
| 153 } | 193 } |
| 154 | 194 |
| 155 | 195 |
| 156 TEST_CASE(MessageQueue_FlushAll) { | 196 TEST_CASE(MessageQueue_FlushAll) { |
| 157 MessageQueue queue; | 197 MessageQueue queue; |
| 158 MessageQueueTestPeer queue_peer(&queue); | 198 MessageQueueTestPeer queue_peer(&queue); |
| 159 Dart_Port port1 = 1; | 199 Dart_Port port1 = 1; |
| 160 Dart_Port port2 = 2; | 200 Dart_Port port2 = 2; |
| 161 | 201 |
| 162 // Add two messages. | 202 // Add two messages. |
| 163 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 203 Message* msg1 = |
| 204 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
| 164 queue.Enqueue(msg1); | 205 queue.Enqueue(msg1); |
| 165 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); | 206 Message* msg2 = |
| 207 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
| 166 queue.Enqueue(msg2); | 208 queue.Enqueue(msg2); |
| 167 | 209 |
| 168 EXPECT(queue_peer.HasMessage()); | 210 EXPECT(queue_peer.HasMessage()); |
| 169 queue.FlushAll(); | 211 queue.FlushAll(); |
| 170 EXPECT(!queue_peer.HasMessage()); | 212 EXPECT(!queue_peer.HasMessage()); |
| 171 | 213 |
| 172 // msg1 and msg2 already delete by FlushAll. | 214 // msg1 and msg2 already delete by FlushAll. |
| 173 } | 215 } |
| 174 | 216 |
| 175 | 217 |
| 176 TEST_CASE(MessageQueue_Flush) { | 218 TEST_CASE(MessageQueue_Flush) { |
| 177 MessageQueue queue; | 219 MessageQueue queue; |
| 178 MessageQueueTestPeer queue_peer(&queue); | 220 MessageQueueTestPeer queue_peer(&queue); |
| 179 Dart_Port port1 = 1; | 221 Dart_Port port1 = 1; |
| 180 Dart_Port port2 = 2; | 222 Dart_Port port2 = 2; |
| 181 | 223 |
| 182 // Add two messages on different ports. | 224 // Add two messages on different ports. |
| 183 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 225 Message* msg1 = |
| 226 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
| 184 queue.Enqueue(msg1); | 227 queue.Enqueue(msg1); |
| 185 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); | 228 Message* msg2 = |
| 229 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
| 186 queue.Enqueue(msg2); | 230 queue.Enqueue(msg2); |
| 187 EXPECT(queue_peer.HasMessage()); | 231 EXPECT(queue_peer.HasMessage()); |
| 188 | 232 |
| 189 queue.Flush(port1); | 233 queue.Flush(port1); |
| 190 | 234 |
| 191 // One message is left in the queue. | 235 // One message is left in the queue. |
| 192 EXPECT(queue_peer.HasMessage()); | 236 EXPECT(queue_peer.HasMessage()); |
| 193 PortMessage* msg = queue.Dequeue(0); | 237 Message* msg = queue.Dequeue(0); |
| 194 EXPECT(msg != NULL); | 238 EXPECT(msg != NULL); |
| 195 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | 239 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); |
| 196 | 240 |
| 197 EXPECT(!queue_peer.HasMessage()); | 241 EXPECT(!queue_peer.HasMessage()); |
| 198 | 242 |
| 199 // msg1 is already deleted by Flush. | 243 // msg1 is already deleted by Flush. |
| 200 delete msg2; | 244 delete msg2; |
| 201 } | 245 } |
| 202 | 246 |
| 203 | 247 |
| 204 TEST_CASE(MessageQueue_Flush_MultipleMessages) { | 248 TEST_CASE(MessageQueue_Flush_MultipleMessages) { |
| 205 MessageQueue queue; | 249 MessageQueue queue; |
| 206 MessageQueueTestPeer queue_peer(&queue); | 250 MessageQueueTestPeer queue_peer(&queue); |
| 207 Dart_Port port1 = 1; | 251 Dart_Port port1 = 1; |
| 208 | 252 |
| 209 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 253 Message* msg1 = |
| 254 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
| 210 queue.Enqueue(msg1); | 255 queue.Enqueue(msg1); |
| 211 PortMessage* msg2 = new PortMessage(port1, 0, AllocMsg("msg2")); | 256 Message* msg2 = |
| 257 new Message(port1, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
| 212 queue.Enqueue(msg2); | 258 queue.Enqueue(msg2); |
| 213 EXPECT(queue_peer.HasMessage()); | 259 EXPECT(queue_peer.HasMessage()); |
| 214 | 260 |
| 215 queue.Flush(port1); | 261 queue.Flush(port1); |
| 216 | 262 |
| 217 // Queue is empty. | 263 // Queue is empty. |
| 218 EXPECT(!queue_peer.HasMessage()); | 264 EXPECT(!queue_peer.HasMessage()); |
| 219 // msg1 and msg2 are already deleted by Flush. | 265 // msg1 and msg2 are already deleted by Flush. |
| 220 } | 266 } |
| 221 | 267 |
| 222 | 268 |
| 223 TEST_CASE(MessageQueue_Flush_EmptyQueue) { | 269 TEST_CASE(MessageQueue_Flush_EmptyQueue) { |
| 224 MessageQueue queue; | 270 MessageQueue queue; |
| 225 MessageQueueTestPeer queue_peer(&queue); | 271 MessageQueueTestPeer queue_peer(&queue); |
| 226 Dart_Port port1 = 1; | 272 Dart_Port port1 = 1; |
| 227 | 273 |
| 228 EXPECT(!queue_peer.HasMessage()); | 274 EXPECT(!queue_peer.HasMessage()); |
| 229 queue.Flush(port1); | 275 queue.Flush(port1); |
| 230 | 276 |
| 231 // Queue is still empty. | 277 // Queue is still empty. |
| 232 EXPECT(!queue_peer.HasMessage()); | 278 EXPECT(!queue_peer.HasMessage()); |
| 233 } | 279 } |
| 234 | 280 |
| 235 | |
| 236 } // namespace dart | 281 } // namespace dart |
| OLD | NEW |