Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(144)

Side by Side Diff: runtime/vm/message_queue_test.cc

Issue 9182001: OOB messages and general message refactor. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: '' Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/assert.h" 5 #include "vm/assert.h"
6 #include "vm/message_queue.h" 6 #include "vm/message_queue.h"
7 #include "vm/unit_test.h" 7 #include "vm/unit_test.h"
8 8
9 namespace dart { 9 namespace dart {
10 10
11 11
12 // Provide access to private members of MessageQueue for testing. 12 // Provide access to private members of MessageQueue for testing.
13 class MessageQueueTestPeer { 13 class MessageQueueTestPeer {
14 public: 14 public:
15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} 15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {}
16 16
17 bool HasMessage() const { return queue_->head_ != NULL; } 17 bool HasMessage() const {
Anton Muhin 2012/01/12 12:58:16 that's a pure paranoia, but technically you're not
turnidge 2012/01/12 19:01:55 Added lock.
18 return (queue_->head_[Message::kNormalPriority] != NULL ||
19 queue_->head_[Message::kOOBPriority] != NULL); }
18 20
19 private: 21 private:
20 MessageQueue* queue_; 22 MessageQueue* queue_;
21 }; 23 };
22 24
23 25
24 static Dart_Message AllocMsg(const char* str) { 26 static uint8_t* AllocMsg(const char* str) {
25 return reinterpret_cast<Dart_Message>(strdup(str)); 27 return reinterpret_cast<uint8_t*>(strdup(str));
26 } 28 }
27 29
28 30
29 TEST_CASE(MessageQueue_BasicOperations) { 31 TEST_CASE(MessageQueue_BasicOperations) {
30 MessageQueue queue; 32 MessageQueue queue;
31 MessageQueueTestPeer queue_peer(&queue); 33 MessageQueueTestPeer queue_peer(&queue);
32 EXPECT(!queue_peer.HasMessage()); 34 EXPECT(!queue_peer.HasMessage());
33 35
34 Dart_Port port = 1; 36 Dart_Port port = 1;
35 37
36 // Add two messages. 38 // Add two messages.
37 PortMessage* msg1 = new PortMessage(port, 0, AllocMsg("msg1")); 39 Message* msg1 =
40 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority);
38 queue.Enqueue(msg1); 41 queue.Enqueue(msg1);
39 EXPECT(queue_peer.HasMessage()); 42 EXPECT(queue_peer.HasMessage());
40 43
41 PortMessage* msg2 = new PortMessage(port, 0, AllocMsg("msg2")); 44 Message* msg2 =
45 new Message(port, 0, AllocMsg("msg2"), Message::kNormalPriority);
46
42 queue.Enqueue(msg2); 47 queue.Enqueue(msg2);
43 EXPECT(queue_peer.HasMessage()); 48 EXPECT(queue_peer.HasMessage());
44 49
45 // Remove two messages. 50 // Remove two messages.
46 PortMessage* msg = queue.Dequeue(0); 51 Message* msg = queue.Dequeue(0);
47 EXPECT(msg != NULL); 52 EXPECT(msg != NULL);
48 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); 53 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data()));
49 EXPECT(queue_peer.HasMessage()); 54 EXPECT(queue_peer.HasMessage());
50 55
51 msg = queue.Dequeue(0); 56 msg = queue.Dequeue(0);
52 EXPECT(msg != NULL); 57 EXPECT(msg != NULL);
53 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); 58 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
54 EXPECT(!queue_peer.HasMessage()); 59 EXPECT(!queue_peer.HasMessage());
55 60
56 delete msg1; 61 delete msg1;
57 delete msg2; 62 delete msg2;
58 } 63 }
59 64
60 65
66 TEST_CASE(MessageQueue_Priorities) {
67 MessageQueue queue;
68 MessageQueueTestPeer queue_peer(&queue);
69 EXPECT(!queue_peer.HasMessage());
70
71 Dart_Port port = 1;
72
73 // Add two messages.
74 Message* msg1 =
75 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority);
76 queue.Enqueue(msg1);
77 EXPECT(queue_peer.HasMessage());
78
79 Message* msg2 =
80 new Message(port, 0, AllocMsg("msg2"), Message::kOOBPriority);
81
82 queue.Enqueue(msg2);
83 EXPECT(queue_peer.HasMessage());
84
85 // The higher priority message is delivered first.
86 Message* msg = queue.Dequeue(0);
87 EXPECT(msg != NULL);
88 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
89 EXPECT(queue_peer.HasMessage());
90
91 msg = queue.Dequeue(0);
92 EXPECT(msg != NULL);
93 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data()));
94 EXPECT(!queue_peer.HasMessage());
95
96 delete msg1;
97 delete msg2;
98 }
99
100
61 // A thread which receives an expected sequence of messages. 101 // A thread which receives an expected sequence of messages.
62 static Monitor* sync = NULL; 102 static Monitor* sync = NULL;
63 static MessageQueue* shared_queue = NULL; 103 static MessageQueue* shared_queue = NULL;
64 void MessageReceiver_start(uword unused) { 104 void MessageReceiver_start(uword unused) {
65 // We only need an isolate here because the MonitorLocker in the 105 // We only need an isolate here because the MonitorLocker in the
66 // MessageQueue expects it, we don't need to initialize the isolate 106 // MessageQueue expects it, we don't need to initialize the isolate
67 // as it does not run any dart code. 107 // as it does not run any dart code.
68 Dart::CreateIsolate(); 108 Dart::CreateIsolate();
69 109
70 // Create a message queue and share it. 110 // Create a message queue and share it.
71 MessageQueue* queue = new MessageQueue(); 111 MessageQueue* queue = new MessageQueue();
72 MessageQueueTestPeer peer(queue); 112 MessageQueueTestPeer peer(queue);
73 shared_queue = queue; 113 shared_queue = queue;
74 114
75 // Tell the other thread that the shared queue is ready. 115 // Tell the other thread that the shared queue is ready.
76 { 116 {
77 MonitorLocker ml(sync); 117 MonitorLocker ml(sync);
78 ml.Notify(); 118 ml.Notify();
79 } 119 }
80 120
81 // Wait for the other thread to fill the queue a bit. 121 // Wait for the other thread to fill the queue a bit.
82 while (!peer.HasMessage()) { 122 while (!peer.HasMessage()) {
83 MonitorLocker ml(sync); 123 MonitorLocker ml(sync);
84 ml.Wait(5); 124 ml.Wait(5);
85 } 125 }
86 126
87 for (int i = 0; i < 3; i++) { 127 for (int i = 0; i < 3; i++) {
88 PortMessage* msg = queue->Dequeue(0); 128 Message* msg = queue->Dequeue(0);
89 EXPECT(msg != NULL); 129 EXPECT(msg != NULL);
90 EXPECT_EQ(i+10, msg->dest_port()); 130 EXPECT_EQ(i+10, msg->dest_port());
91 EXPECT_EQ(i+100, msg->reply_port()); 131 EXPECT_EQ(i+100, msg->reply_port());
92 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data()))); 132 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data())));
93 delete msg; 133 delete msg;
94 } 134 }
95 for (int i = 0; i < 3; i++) { 135 for (int i = 0; i < 3; i++) {
96 PortMessage* msg = queue->Dequeue(0); 136 Message* msg = queue->Dequeue(0);
97 EXPECT(msg != NULL); 137 EXPECT(msg != NULL);
98 EXPECT_EQ(i+20, msg->dest_port()); 138 EXPECT_EQ(i+20, msg->dest_port());
99 EXPECT_EQ(i+200, msg->reply_port()); 139 EXPECT_EQ(i+200, msg->reply_port());
100 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data()))); 140 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data())));
101 delete msg; 141 delete msg;
102 } 142 }
103 shared_queue = NULL; 143 shared_queue = NULL;
104 delete queue; 144 delete queue;
105 Dart::ShutdownIsolate(); 145 Dart::ShutdownIsolate();
106 } 146 }
107 147
108 148
109 TEST_CASE(MessageQueue_WaitNotify) { 149 TEST_CASE(MessageQueue_WaitNotify) {
110 sync = new Monitor(); 150 sync = new Monitor();
111 151
112 Thread* thread = new Thread(MessageReceiver_start, 0); 152 Thread* thread = new Thread(MessageReceiver_start, 0);
113 EXPECT(thread != NULL); 153 EXPECT(thread != NULL);
114 154
115 // Wait for the shared queue to be created. 155 // Wait for the shared queue to be created.
116 while (shared_queue == NULL) { 156 while (shared_queue == NULL) {
117 MonitorLocker ml(sync); 157 MonitorLocker ml(sync);
118 ml.Wait(5); 158 ml.Wait(5);
119 } 159 }
120 ASSERT(shared_queue != NULL); 160 ASSERT(shared_queue != NULL);
121 161
122 // Pile up three messages before the other thread runs. 162 // Pile up three messages before the other thread runs.
123 for (int i = 0; i < 3; i++) { 163 for (int i = 0; i < 3; i++) {
124 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); 164 int* data = reinterpret_cast<int*>(malloc(sizeof(*data)));
125 *data = i+1000; 165 *data = i+1000;
126 PortMessage* msg = 166 Message* msg = new Message(i+10, i+100, reinterpret_cast<uint8_t*>(data),
Anton Muhin 2012/01/12 12:58:16 nit: spaces around +'s
turnidge 2012/01/12 19:01:55 Done.
127 new PortMessage(i+10, i+100, reinterpret_cast<Dart_Message>(data)); 167 Message::kNormalPriority);
128 shared_queue->Enqueue(msg); 168 shared_queue->Enqueue(msg);
129 } 169 }
130 170
131 // Wake the other thread and have it start consuming messages. 171 // Wake the other thread and have it start consuming messages.
132 { 172 {
133 MonitorLocker ml(sync); 173 MonitorLocker ml(sync);
134 ml.Notify(); 174 ml.Notify();
135 } 175 }
136 176
137 // Add a few more messages after sleeping to allow the other thread 177 // Add a few more messages after sleeping to allow the other thread
138 // to potentially exercise the blocking code path in Dequeue. 178 // to potentially exercise the blocking code path in Dequeue.
139 OS::Sleep(5); 179 OS::Sleep(5);
140 for (int i = 0; i < 3; i++) { 180 for (int i = 0; i < 3; i++) {
141 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); 181 int* data = reinterpret_cast<int*>(malloc(sizeof(*data)));
142 *data = i+2000; 182 *data = i+2000;
143 PortMessage* msg = 183 Message* msg = new Message(i+20, i+200, reinterpret_cast<uint8_t*>(data),
144 new PortMessage(i+20, i+200, reinterpret_cast<Dart_Message>(data)); 184 Message::kNormalPriority);
145 shared_queue->Enqueue(msg); 185 shared_queue->Enqueue(msg);
146 } 186 }
147 187
148 sync = NULL; 188 sync = NULL;
149 delete sync; 189 delete sync;
150 190
151 // Give the spawned thread enough time to properly exit. 191 // Give the spawned thread enough time to properly exit.
152 OS::Sleep(20); 192 OS::Sleep(20);
153 } 193 }
154 194
155 195
156 TEST_CASE(MessageQueue_FlushAll) { 196 TEST_CASE(MessageQueue_FlushAll) {
157 MessageQueue queue; 197 MessageQueue queue;
158 MessageQueueTestPeer queue_peer(&queue); 198 MessageQueueTestPeer queue_peer(&queue);
159 Dart_Port port1 = 1; 199 Dart_Port port1 = 1;
160 Dart_Port port2 = 2; 200 Dart_Port port2 = 2;
161 201
162 // Add two messages. 202 // Add two messages.
163 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 203 Message* msg1 =
204 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
164 queue.Enqueue(msg1); 205 queue.Enqueue(msg1);
165 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); 206 Message* msg2 =
207 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority);
166 queue.Enqueue(msg2); 208 queue.Enqueue(msg2);
167 209
168 EXPECT(queue_peer.HasMessage()); 210 EXPECT(queue_peer.HasMessage());
169 queue.FlushAll(); 211 queue.FlushAll();
170 EXPECT(!queue_peer.HasMessage()); 212 EXPECT(!queue_peer.HasMessage());
171 213
172 // msg1 and msg2 already delete by FlushAll. 214 // msg1 and msg2 already delete by FlushAll.
173 } 215 }
174 216
175 217
176 TEST_CASE(MessageQueue_Flush) { 218 TEST_CASE(MessageQueue_Flush) {
177 MessageQueue queue; 219 MessageQueue queue;
178 MessageQueueTestPeer queue_peer(&queue); 220 MessageQueueTestPeer queue_peer(&queue);
179 Dart_Port port1 = 1; 221 Dart_Port port1 = 1;
180 Dart_Port port2 = 2; 222 Dart_Port port2 = 2;
181 223
182 // Add two messages on different ports. 224 // Add two messages on different ports.
183 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 225 Message* msg1 =
226 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
184 queue.Enqueue(msg1); 227 queue.Enqueue(msg1);
185 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); 228 Message* msg2 =
229 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority);
186 queue.Enqueue(msg2); 230 queue.Enqueue(msg2);
187 EXPECT(queue_peer.HasMessage()); 231 EXPECT(queue_peer.HasMessage());
188 232
189 queue.Flush(port1); 233 queue.Flush(port1);
190 234
191 // One message is left in the queue. 235 // One message is left in the queue.
192 EXPECT(queue_peer.HasMessage()); 236 EXPECT(queue_peer.HasMessage());
193 PortMessage* msg = queue.Dequeue(0); 237 Message* msg = queue.Dequeue(0);
194 EXPECT(msg != NULL); 238 EXPECT(msg != NULL);
195 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); 239 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
196 240
197 EXPECT(!queue_peer.HasMessage()); 241 EXPECT(!queue_peer.HasMessage());
198 242
199 // msg1 is already deleted by Flush. 243 // msg1 is already deleted by Flush.
200 delete msg2; 244 delete msg2;
201 } 245 }
202 246
203 247
204 TEST_CASE(MessageQueue_Flush_MultipleMessages) { 248 TEST_CASE(MessageQueue_Flush_MultipleMessages) {
205 MessageQueue queue; 249 MessageQueue queue;
206 MessageQueueTestPeer queue_peer(&queue); 250 MessageQueueTestPeer queue_peer(&queue);
207 Dart_Port port1 = 1; 251 Dart_Port port1 = 1;
208 252
209 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 253 Message* msg1 =
254 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
210 queue.Enqueue(msg1); 255 queue.Enqueue(msg1);
211 PortMessage* msg2 = new PortMessage(port1, 0, AllocMsg("msg2")); 256 Message* msg2 =
257 new Message(port1, 0, AllocMsg("msg2"), Message::kNormalPriority);
212 queue.Enqueue(msg2); 258 queue.Enqueue(msg2);
213 EXPECT(queue_peer.HasMessage()); 259 EXPECT(queue_peer.HasMessage());
214 260
215 queue.Flush(port1); 261 queue.Flush(port1);
216 262
217 // Queue is empty. 263 // Queue is empty.
218 EXPECT(!queue_peer.HasMessage()); 264 EXPECT(!queue_peer.HasMessage());
219 // msg1 and msg2 are already deleted by Flush. 265 // msg1 and msg2 are already deleted by Flush.
220 } 266 }
221 267
222 268
223 TEST_CASE(MessageQueue_Flush_EmptyQueue) { 269 TEST_CASE(MessageQueue_Flush_EmptyQueue) {
224 MessageQueue queue; 270 MessageQueue queue;
225 MessageQueueTestPeer queue_peer(&queue); 271 MessageQueueTestPeer queue_peer(&queue);
226 Dart_Port port1 = 1; 272 Dart_Port port1 = 1;
227 273
228 EXPECT(!queue_peer.HasMessage()); 274 EXPECT(!queue_peer.HasMessage());
229 queue.Flush(port1); 275 queue.Flush(port1);
230 276
231 // Queue is still empty. 277 // Queue is still empty.
232 EXPECT(!queue_peer.HasMessage()); 278 EXPECT(!queue_peer.HasMessage());
233 } 279 }
234 280
235
236 } // namespace dart 281 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698