Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: runtime/vm/message_handler_test.cc

Issue 11440035: Optimize the message queue for many active ports with few messages. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Merge with caching changes. Created 8 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/message_handler.cc ('k') | runtime/vm/message_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/message_handler.h" 5 #include "vm/message_handler.h"
6 #include "vm/port.h"
6 #include "vm/unit_test.h" 7 #include "vm/unit_test.h"
7 8
8 namespace dart { 9 namespace dart {
9 10
10 class MessageHandlerTestPeer { 11 class MessageHandlerTestPeer {
11 public: 12 public:
12 explicit MessageHandlerTestPeer(MessageHandler* handler) 13 explicit MessageHandlerTestPeer(MessageHandler* handler)
13 : handler_(handler) {} 14 : handler_(handler) {}
14 15
15 void PostMessage(Message* message) { handler_->PostMessage(message); } 16 void PostMessage(Message* message) { handler_->PostMessage(message); }
16 void ClosePort(Dart_Port port) { handler_->ClosePort(port); } 17 void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
17 void CloseAllPorts() { handler_->CloseAllPorts(); } 18 void CloseAllPorts() { handler_->CloseAllPorts(); }
18 19
19 void increment_live_ports() { handler_->increment_live_ports(); } 20 void increment_live_ports() { handler_->increment_live_ports(); }
20 void decrement_live_ports() { handler_->decrement_live_ports(); } 21 void decrement_live_ports() { handler_->decrement_live_ports(); }
21 22
22 MessageQueue* queue() const { return handler_->queue_; } 23 MessageQueue* queue() const { return handler_->queue_; }
23 MessageQueue* oob_queue() const { return handler_->oob_queue_; } 24 MessageQueue* oob_queue() const { return handler_->oob_queue_; }
24 25
25 private: 26 private:
26 MessageHandler* handler_; 27 MessageHandler* handler_;
27 28
28 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer); 29 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer);
29 }; 30 };
30 31
31 32
32 class TestMessageHandler : public MessageHandler { 33 class TestMessageHandler : public MessageHandler {
33 public: 34 public:
34 TestMessageHandler() 35 TestMessageHandler()
35 : port_buffer_(strdup("")), 36 : port_buffer_(NULL),
37 port_buffer_size_(0),
36 notify_count_(0), 38 notify_count_(0),
37 message_count_(0), 39 message_count_(0),
40 start_called_(false),
41 end_called_(false),
38 result_(true) { 42 result_(true) {
39 } 43 }
40 44
41 ~TestMessageHandler() { 45 ~TestMessageHandler() {
42 free(port_buffer_); 46 delete[] port_buffer_;
43 } 47 }
44 48
45 void MessageNotify(Message::Priority priority) { 49 void MessageNotify(Message::Priority priority) {
46 notify_count_++; 50 notify_count_++;
47 } 51 }
48 52
49 bool HandleMessage(Message* message) { 53 bool HandleMessage(Message* message) {
50 // For testing purposes, keep a string with a list of the ports 54 // For testing purposes, keep a list of the ports
51 // for all messages we receive. 55 // for all messages we receive.
52 intptr_t len = 56 AddPortToBuffer(message->dest_port());
53 OS::SNPrint(NULL, 0, "%s %"Pd64"",
54 port_buffer_,
55 message->dest_port()) + 1;
56 char* buffer = reinterpret_cast<char*>(malloc(len));
57 OS::SNPrint(buffer, len, "%s %"Pd64"",
58 port_buffer_,
59 message->dest_port());
60 free(port_buffer_);
61 port_buffer_ = buffer;
62 delete message; 57 delete message;
63 message_count_++; 58 message_count_++;
64 return result_; 59 return result_;
65 } 60 }
66 61
67
68 bool Start() { 62 bool Start() {
69 intptr_t len = 63 start_called_ = true;
70 OS::SNPrint(NULL, 0, "%s start", port_buffer_) + 1;
71 char* buffer = reinterpret_cast<char*>(malloc(len));
72 OS::SNPrint(buffer, len, "%s start", port_buffer_);
73 free(port_buffer_);
74 port_buffer_ = buffer;
75 return true; 64 return true;
76 } 65 }
77 66
78
79 void End() { 67 void End() {
80 intptr_t len = 68 end_called_ = true;
81 OS::SNPrint(NULL, 0, "%s end", port_buffer_) + 1; 69 AddPortToBuffer(-2);
82 char* buffer = reinterpret_cast<char*>(malloc(len));
83 OS::SNPrint(buffer, len, "%s end", port_buffer_);
84 free(port_buffer_);
85 port_buffer_ = buffer;
86 } 70 }
87 71
88 72 Dart_Port* port_buffer() const { return port_buffer_; }
89 const char* port_buffer() const { return port_buffer_; }
90 int notify_count() const { return notify_count_; } 73 int notify_count() const { return notify_count_; }
91 int message_count() const { return message_count_; } 74 int message_count() const { return message_count_; }
75 bool start_called() const { return start_called_; }
76 bool end_called() const { return end_called_; }
92 77
93 void set_result(bool result) { result_ = result; } 78 void set_result(bool result) { result_ = result; }
94 79
95 private: 80 private:
96 char* port_buffer_; 81 void AddPortToBuffer(Dart_Port port) {
82 if (port_buffer_ == NULL) {
83 port_buffer_ = new Dart_Port[10];
84 port_buffer_size_ = 10;
85 } else if (message_count_ == port_buffer_size_) {
86 port_buffer_size_ = 2 * port_buffer_size_;
87 delete[] port_buffer_;
88 port_buffer_ = new Dart_Port[port_buffer_size_];
89 }
90 port_buffer_[message_count_] = port;
91 }
92
93 Dart_Port* port_buffer_;
94 int port_buffer_size_;
97 int notify_count_; 95 int notify_count_;
98 int message_count_; 96 int message_count_;
97 bool start_called_;
98 bool end_called_;
99 bool result_; 99 bool result_;
100 100
101 DISALLOW_COPY_AND_ASSIGN(TestMessageHandler); 101 DISALLOW_COPY_AND_ASSIGN(TestMessageHandler);
102 }; 102 };
103 103
104 104
105 bool TestStartFunction(uword data) { 105 bool TestStartFunction(uword data) {
106 return (reinterpret_cast<TestMessageHandler*>(data))->Start(); 106 return (reinterpret_cast<TestMessageHandler*>(data))->Start();
107 } 107 }
108 108
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
146 UNIT_TEST_CASE(MessageHandler_ClosePort) { 146 UNIT_TEST_CASE(MessageHandler_ClosePort) {
147 TestMessageHandler handler; 147 TestMessageHandler handler;
148 MessageHandlerTestPeer handler_peer(&handler); 148 MessageHandlerTestPeer handler_peer(&handler);
149 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority); 149 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority);
150 handler_peer.PostMessage(message1); 150 handler_peer.PostMessage(message1);
151 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority); 151 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority);
152 handler_peer.PostMessage(message2); 152 handler_peer.PostMessage(message2);
153 153
154 handler_peer.ClosePort(1); 154 handler_peer.ClosePort(1);
155 155
156 // The message on port 1 is dropped from the queue. 156 // Closing the port does not drop the messages from the queue.
157 EXPECT(message1 == handler_peer.queue()->Dequeue());
157 EXPECT(message2 == handler_peer.queue()->Dequeue()); 158 EXPECT(message2 == handler_peer.queue()->Dequeue());
158 EXPECT(NULL == handler_peer.queue()->Dequeue()); 159 delete message1;
159 delete message2; 160 delete message2;
160 } 161 }
161 162
162 163
163 UNIT_TEST_CASE(MessageHandler_CloseAllPorts) { 164 UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
164 TestMessageHandler handler; 165 TestMessageHandler handler;
165 MessageHandlerTestPeer handler_peer(&handler); 166 MessageHandlerTestPeer handler_peer(&handler);
166 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority); 167 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority);
167 handler_peer.PostMessage(message1); 168 handler_peer.PostMessage(message1);
168 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority); 169 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority);
169 handler_peer.PostMessage(message2); 170 handler_peer.PostMessage(message2);
170 171
171 handler_peer.CloseAllPorts(); 172 handler_peer.CloseAllPorts();
172 173
173 // All messages are dropped from the queue. 174 // All messages are dropped from the queue.
174 EXPECT(NULL == handler_peer.queue()->Dequeue()); 175 EXPECT(NULL == handler_peer.queue()->Dequeue());
175 } 176 }
176 177
177 178
178 UNIT_TEST_CASE(MessageHandler_HandleNextMessage) { 179 UNIT_TEST_CASE(MessageHandler_HandleNextMessage) {
179 TestMessageHandler handler; 180 TestMessageHandler handler;
180 MessageHandlerTestPeer handler_peer(&handler); 181 MessageHandlerTestPeer handler_peer(&handler);
181 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority); 182 Dart_Port port1 = PortMap::CreatePort(&handler);
183 Dart_Port port2 = PortMap::CreatePort(&handler);
184 Dart_Port port3 = PortMap::CreatePort(&handler);
185 Message* message1 = new Message(port1, 0, NULL, 0, Message::kNormalPriority);
182 handler_peer.PostMessage(message1); 186 handler_peer.PostMessage(message1);
183 Message* oob_message1 = new Message(3, 0, NULL, 0, Message::kOOBPriority); 187 Message* oob_message1 = new Message(port2, 0, NULL, 0, Message::kOOBPriority);
184 handler_peer.PostMessage(oob_message1); 188 handler_peer.PostMessage(oob_message1);
185 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority); 189 Message* message2 = new Message(port2, 0, NULL, 0, Message::kNormalPriority);
186 handler_peer.PostMessage(message2); 190 handler_peer.PostMessage(message2);
187 Message* oob_message2 = new Message(4, 0, NULL, 0, Message::kOOBPriority); 191 Message* oob_message2 = new Message(port3, 0, NULL, 0, Message::kOOBPriority);
188 handler_peer.PostMessage(oob_message2); 192 handler_peer.PostMessage(oob_message2);
189 193
190 // We handle both oob messages and a single normal message. 194 // We handle both oob messages and a single normal message.
191 EXPECT(handler.HandleNextMessage()); 195 EXPECT(handler.HandleNextMessage());
192 EXPECT_STREQ(" 3 4 1", handler.port_buffer()); 196 EXPECT_EQ(3, handler.message_count());
193 handler_peer.CloseAllPorts(); 197 Dart_Port* ports = handler.port_buffer();
198 EXPECT_EQ(port2, ports[0]);
199 EXPECT_EQ(port3, ports[1]);
200 EXPECT_EQ(port1, ports[2]);
201 PortMap::ClosePorts(&handler);
194 } 202 }
195 203
196 204
197 UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) { 205 UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
198 TestMessageHandler handler; 206 TestMessageHandler handler;
199 MessageHandlerTestPeer handler_peer(&handler); 207 MessageHandlerTestPeer handler_peer(&handler);
200 Message* message1 = new Message(1, 0, NULL, 0, Message::kNormalPriority); 208 Dart_Port port1 = PortMap::CreatePort(&handler);
209 Dart_Port port2 = PortMap::CreatePort(&handler);
210 Dart_Port port3 = PortMap::CreatePort(&handler);
211 Dart_Port port4 = PortMap::CreatePort(&handler);
212 Message* message1 = new Message(port1, 0, NULL, 0, Message::kNormalPriority);
201 handler_peer.PostMessage(message1); 213 handler_peer.PostMessage(message1);
202 Message* message2 = new Message(2, 0, NULL, 0, Message::kNormalPriority); 214 Message* message2 = new Message(port2, 0, NULL, 0, Message::kNormalPriority);
203 handler_peer.PostMessage(message2); 215 handler_peer.PostMessage(message2);
204 Message* oob_message1 = new Message(3, 0, NULL, 0, Message::kOOBPriority); 216 Message* oob_message1 = new Message(port3, 0, NULL, 0, Message::kOOBPriority);
205 handler_peer.PostMessage(oob_message1); 217 handler_peer.PostMessage(oob_message1);
206 Message* oob_message2 = new Message(4, 0, NULL, 0, Message::kOOBPriority); 218 Message* oob_message2 = new Message(port4, 0, NULL, 0, Message::kOOBPriority);
207 handler_peer.PostMessage(oob_message2); 219 handler_peer.PostMessage(oob_message2);
208 220
209 // We handle both oob messages but no normal messages. 221 // We handle both oob messages but no normal messages.
210 EXPECT(handler.HandleOOBMessages()); 222 EXPECT(handler.HandleOOBMessages());
211 EXPECT_STREQ(" 3 4", handler.port_buffer()); 223 EXPECT_EQ(2, handler.message_count());
224 Dart_Port* ports = handler.port_buffer();
225 EXPECT_EQ(port3, ports[0]);
226 EXPECT_EQ(port4, ports[1]);
212 handler_peer.CloseAllPorts(); 227 handler_peer.CloseAllPorts();
213 } 228 }
214 229
215 230
216 struct ThreadStartInfo { 231 struct ThreadStartInfo {
217 MessageHandler* handler; 232 MessageHandler* handler;
233 Dart_Port* ports;
218 int count; 234 int count;
219 }; 235 };
220 236
221 237
222 static void SendMessages(uword param) { 238 static void SendMessages(uword param) {
223 ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param); 239 ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param);
224 MessageHandler* handler = info->handler; 240 MessageHandler* handler = info->handler;
225 MessageHandlerTestPeer handler_peer(handler); 241 MessageHandlerTestPeer handler_peer(handler);
226 for (int i = 0; i < info->count; i++) { 242 for (int i = 0; i < info->count; i++) {
227 Message* message = new Message(i + 1, 0, NULL, 0, Message::kNormalPriority); 243 Message* message =
244 new Message(info->ports[i], 0, NULL, 0, Message::kNormalPriority);
228 handler_peer.PostMessage(message); 245 handler_peer.PostMessage(message);
229 } 246 }
230 } 247 }
231 248
232 249
233 UNIT_TEST_CASE(MessageHandler_Run) { 250 UNIT_TEST_CASE(MessageHandler_Run) {
234 ThreadPool pool; 251 ThreadPool pool;
235 TestMessageHandler handler; 252 TestMessageHandler handler;
236 MessageHandlerTestPeer handler_peer(&handler); 253 MessageHandlerTestPeer handler_peer(&handler);
237 int sleep = 0; 254 int sleep = 0;
238 const int kMaxSleep = 20 * 1000; // 20 seconds. 255 const int kMaxSleep = 20 * 1000; // 20 seconds.
239 256
240 EXPECT(!handler.HasLivePorts()); 257 EXPECT(!handler.HasLivePorts());
241 handler_peer.increment_live_ports(); 258 handler_peer.increment_live_ports();
242 259
243 handler.Run(&pool, 260 handler.Run(&pool,
244 TestStartFunction, 261 TestStartFunction,
245 TestEndFunction, 262 TestEndFunction,
246 reinterpret_cast<uword>(&handler)); 263 reinterpret_cast<uword>(&handler));
247 Message* message = new Message(100, 0, NULL, 0, Message::kNormalPriority); 264 Dart_Port port = PortMap::CreatePort(&handler);
265 Message* message = new Message(port, 0, NULL, 0, Message::kNormalPriority);
248 handler_peer.PostMessage(message); 266 handler_peer.PostMessage(message);
249 267
250 // Wait for the first message to be handled. 268 // Wait for the first message to be handled.
251 while (sleep < kMaxSleep && handler.message_count() < 1) { 269 while (sleep < kMaxSleep && handler.message_count() < 1) {
252 OS::Sleep(10); 270 OS::Sleep(10);
253 sleep += 10; 271 sleep += 10;
254 } 272 }
255 EXPECT_STREQ(" start 100", handler.port_buffer()); 273 EXPECT_EQ(1, handler.message_count());
274 EXPECT(handler.start_called());
275 EXPECT(!handler.end_called());
276 Dart_Port* handler_ports = handler.port_buffer();
277 EXPECT_EQ(port, handler_ports[0]);
256 278
257 // Start a thread which sends more messages. 279 // Start a thread which sends more messages.
280 Dart_Port* ports = new Dart_Port[10];
281 for (int i = 0; i < 10; i++) {
282 ports[i] = PortMap::CreatePort(&handler);
283 }
258 ThreadStartInfo info; 284 ThreadStartInfo info;
259 info.handler = &handler; 285 info.handler = &handler;
286 info.ports = ports;
260 info.count = 10; 287 info.count = 10;
261 Thread::Start(SendMessages, reinterpret_cast<uword>(&info)); 288 Thread::Start(SendMessages, reinterpret_cast<uword>(&info));
262 while (sleep < kMaxSleep && handler.message_count() < 11) { 289 while (sleep < kMaxSleep && handler.message_count() < 11) {
263 OS::Sleep(10); 290 OS::Sleep(10);
264 sleep += 10; 291 sleep += 10;
265 } 292 }
266 EXPECT_STREQ(" start 100 1 2 3 4 5 6 7 8 9 10", handler.port_buffer()); 293 EXPECT_EQ(11, handler.message_count());
267 294 EXPECT(handler.start_called());
295 EXPECT(!handler.end_called());
296 EXPECT_EQ(port, handler_ports[0]);
297 for (int i = 1; i < 11; i++) {
298 EXPECT_EQ(ports[i - 1], handler_ports[i]);
299 }
268 handler_peer.decrement_live_ports(); 300 handler_peer.decrement_live_ports();
269 EXPECT(!handler.HasLivePorts()); 301 EXPECT(!handler.HasLivePorts());
302 PortMap::ClosePorts(&handler);
270 } 303 }
271 304
272 } // namespace dart 305 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/message_handler.cc ('k') | runtime/vm/message_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698