OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message_handler.h" | 5 #include "vm/message_handler.h" |
6 | 6 |
7 #include "vm/dart.h" | 7 #include "vm/dart.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 #include "vm/object.h" | 9 #include "vm/object.h" |
10 #include "vm/object_store.h" | 10 #include "vm/object_store.h" |
11 #include "vm/os.h" | 11 #include "vm/os.h" |
12 #include "vm/port.h" | 12 #include "vm/port.h" |
13 #include "vm/thread_interrupter.h" | 13 #include "vm/thread_interrupter.h" |
14 | 14 |
15 | 15 |
16 namespace dart { | 16 namespace dart { |
17 | 17 |
18 DECLARE_FLAG(bool, trace_service_pause_events); | 18 DECLARE_FLAG(bool, trace_service_pause_events); |
19 | 19 |
20 class MessageHandlerTask : public ThreadPool::Task { | 20 class MessageHandlerTask : public ThreadPool::Task { |
21 public: | 21 public: |
22 explicit MessageHandlerTask(MessageHandler* handler) | 22 explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) { |
23 : handler_(handler) { | |
24 ASSERT(handler != NULL); | 23 ASSERT(handler != NULL); |
25 } | 24 } |
26 | 25 |
27 virtual void Run() { | 26 virtual void Run() { |
28 ASSERT(handler_ != NULL); | 27 ASSERT(handler_ != NULL); |
29 handler_->TaskCallback(); | 28 handler_->TaskCallback(); |
30 } | 29 } |
31 | 30 |
32 private: | 31 private: |
33 MessageHandler* handler_; | 32 MessageHandler* handler_; |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
103 } | 102 } |
104 | 103 |
105 | 104 |
106 void MessageHandler::Run(ThreadPool* pool, | 105 void MessageHandler::Run(ThreadPool* pool, |
107 StartCallback start_callback, | 106 StartCallback start_callback, |
108 EndCallback end_callback, | 107 EndCallback end_callback, |
109 CallbackData data) { | 108 CallbackData data) { |
110 bool task_running; | 109 bool task_running; |
111 MonitorLocker ml(&monitor_); | 110 MonitorLocker ml(&monitor_); |
112 if (FLAG_trace_isolates) { | 111 if (FLAG_trace_isolates) { |
113 OS::Print("[+] Starting message handler:\n" | 112 OS::Print( |
114 "\thandler: %s\n", | 113 "[+] Starting message handler:\n" |
115 name()); | 114 "\thandler: %s\n", |
| 115 name()); |
116 } | 116 } |
117 ASSERT(pool_ == NULL); | 117 ASSERT(pool_ == NULL); |
118 ASSERT(!delete_me_); | 118 ASSERT(!delete_me_); |
119 pool_ = pool; | 119 pool_ = pool; |
120 start_callback_ = start_callback; | 120 start_callback_ = start_callback; |
121 end_callback_ = end_callback; | 121 end_callback_ = end_callback; |
122 callback_data_ = data; | 122 callback_data_ = data; |
123 task_ = new MessageHandlerTask(this); | 123 task_ = new MessageHandlerTask(this); |
124 task_running = pool_->Run(task_); | 124 task_running = pool_->Run(task_); |
125 ASSERT(task_running); | 125 ASSERT(task_running); |
126 } | 126 } |
127 | 127 |
128 | 128 |
129 void MessageHandler::PostMessage(Message* message, bool before_events) { | 129 void MessageHandler::PostMessage(Message* message, bool before_events) { |
130 Message::Priority saved_priority; | 130 Message::Priority saved_priority; |
131 bool task_running = true; | 131 bool task_running = true; |
132 { | 132 { |
133 MonitorLocker ml(&monitor_); | 133 MonitorLocker ml(&monitor_); |
134 if (FLAG_trace_isolates) { | 134 if (FLAG_trace_isolates) { |
135 const char* source_name = "<native code>"; | 135 const char* source_name = "<native code>"; |
136 Isolate* source_isolate = Isolate::Current(); | 136 Isolate* source_isolate = Isolate::Current(); |
137 if (source_isolate) { | 137 if (source_isolate) { |
138 source_name = source_isolate->name(); | 138 source_name = source_isolate->name(); |
139 } | 139 } |
140 OS::Print("[>] Posting message:\n" | 140 OS::Print( |
141 "\tlen: %" Pd "\n" | 141 "[>] Posting message:\n" |
142 "\tsource: %s\n" | 142 "\tlen: %" Pd |
143 "\tdest: %s\n" | 143 "\n" |
144 "\tdest_port: %" Pd64 "\n", | 144 "\tsource: %s\n" |
145 message->len(), source_name, name(), message->dest_port()); | 145 "\tdest: %s\n" |
| 146 "\tdest_port: %" Pd64 "\n", |
| 147 message->len(), source_name, name(), message->dest_port()); |
146 } | 148 } |
147 | 149 |
148 saved_priority = message->priority(); | 150 saved_priority = message->priority(); |
149 if (message->IsOOB()) { | 151 if (message->IsOOB()) { |
150 oob_queue_->Enqueue(message, before_events); | 152 oob_queue_->Enqueue(message, before_events); |
151 } else { | 153 } else { |
152 queue_->Enqueue(message, before_events); | 154 queue_->Enqueue(message, before_events); |
153 } | 155 } |
154 message = NULL; // Do not access message. May have been deleted. | 156 message = NULL; // Do not access message. May have been deleted. |
155 | 157 |
(...skipping 28 matching lines...) Expand all Loading... |
184 MessageHandler::MessageStatus MessageHandler::HandleMessages( | 186 MessageHandler::MessageStatus MessageHandler::HandleMessages( |
185 MonitorLocker* ml, | 187 MonitorLocker* ml, |
186 bool allow_normal_messages, | 188 bool allow_normal_messages, |
187 bool allow_multiple_normal_messages) { | 189 bool allow_multiple_normal_messages) { |
188 // TODO(turnidge): Add assert that monitor_ is held here. | 190 // TODO(turnidge): Add assert that monitor_ is held here. |
189 | 191 |
190 // If isolate() returns NULL StartIsolateScope does nothing. | 192 // If isolate() returns NULL StartIsolateScope does nothing. |
191 StartIsolateScope start_isolate(isolate()); | 193 StartIsolateScope start_isolate(isolate()); |
192 | 194 |
193 MessageStatus max_status = kOK; | 195 MessageStatus max_status = kOK; |
194 Message::Priority min_priority = ((allow_normal_messages && !paused()) | 196 Message::Priority min_priority = |
195 ? Message::kNormalPriority | 197 ((allow_normal_messages && !paused()) ? Message::kNormalPriority |
196 : Message::kOOBPriority); | 198 : Message::kOOBPriority); |
197 Message* message = DequeueMessage(min_priority); | 199 Message* message = DequeueMessage(min_priority); |
198 while (message != NULL) { | 200 while (message != NULL) { |
199 intptr_t message_len = message->len(); | 201 intptr_t message_len = message->len(); |
200 if (FLAG_trace_isolates) { | 202 if (FLAG_trace_isolates) { |
201 OS::Print("[<] Handling message:\n" | 203 OS::Print( |
202 "\tlen: %" Pd "\n" | 204 "[<] Handling message:\n" |
203 "\thandler: %s\n" | 205 "\tlen: %" Pd |
204 "\tport: %" Pd64 "\n", | 206 "\n" |
205 message_len, name(), message->dest_port()); | 207 "\thandler: %s\n" |
| 208 "\tport: %" Pd64 "\n", |
| 209 message_len, name(), message->dest_port()); |
206 } | 210 } |
207 | 211 |
208 // Release the monitor_ temporarily while we handle the message. | 212 // Release the monitor_ temporarily while we handle the message. |
209 // The monitor was acquired in MessageHandler::TaskCallback(). | 213 // The monitor was acquired in MessageHandler::TaskCallback(). |
210 ml->Exit(); | 214 ml->Exit(); |
211 Message::Priority saved_priority = message->priority(); | 215 Message::Priority saved_priority = message->priority(); |
212 Dart_Port saved_dest_port = message->dest_port(); | 216 Dart_Port saved_dest_port = message->dest_port(); |
213 MessageStatus status = HandleMessage(message); | 217 MessageStatus status = HandleMessage(message); |
214 if (status > max_status) { | 218 if (status > max_status) { |
215 max_status = status; | 219 max_status = status; |
216 } | 220 } |
217 message = NULL; // May be deleted by now. | 221 message = NULL; // May be deleted by now. |
218 ml->Enter(); | 222 ml->Enter(); |
219 if (FLAG_trace_isolates) { | 223 if (FLAG_trace_isolates) { |
220 OS::Print("[.] Message handled (%s):\n" | 224 OS::Print( |
221 "\tlen: %" Pd "\n" | 225 "[.] Message handled (%s):\n" |
222 "\thandler: %s\n" | 226 "\tlen: %" Pd |
223 "\tport: %" Pd64 "\n", | 227 "\n" |
224 MessageStatusString(status), | 228 "\thandler: %s\n" |
225 message_len, name(), saved_dest_port); | 229 "\tport: %" Pd64 "\n", |
| 230 MessageStatusString(status), message_len, name(), saved_dest_port); |
226 } | 231 } |
227 // If we are shutting down, do not process any more messages. | 232 // If we are shutting down, do not process any more messages. |
228 if (status == kShutdown) { | 233 if (status == kShutdown) { |
229 ClearOOBQueue(); | 234 ClearOOBQueue(); |
230 break; | 235 break; |
231 } | 236 } |
232 | 237 |
233 // Some callers want to process only one normal message and then quit. At | 238 // Some callers want to process only one normal message and then quit. At |
234 // the same time it is OK to process multiple OOB messages. | 239 // the same time it is OK to process multiple OOB messages. |
235 if ((saved_priority == Message::kNormalPriority) && | 240 if ((saved_priority == Message::kNormalPriority) && |
236 !allow_multiple_normal_messages) { | 241 !allow_multiple_normal_messages) { |
237 // We processed one normal message. Allow no more. | 242 // We processed one normal message. Allow no more. |
238 allow_normal_messages = false; | 243 allow_normal_messages = false; |
239 } | 244 } |
240 | 245 |
241 // Reevaluate the minimum allowable priority. The paused state | 246 // Reevaluate the minimum allowable priority. The paused state |
242 // may have changed as part of handling the message. We may also | 247 // may have changed as part of handling the message. We may also |
243 // have encountered an error during message processsing. | 248 // have encountered an error during message processsing. |
244 // | 249 // |
245 // Even if we encounter an error, we still process pending OOB | 250 // Even if we encounter an error, we still process pending OOB |
246 // messages so that we don't lose the message notification. | 251 // messages so that we don't lose the message notification. |
247 min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) | 252 min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) |
248 ? Message::kNormalPriority | 253 ? Message::kNormalPriority |
249 : Message::kOOBPriority); | 254 : Message::kOOBPriority); |
250 message = DequeueMessage(min_priority); | 255 message = DequeueMessage(min_priority); |
251 } | 256 } |
252 return max_status; | 257 return max_status; |
253 } | 258 } |
254 | 259 |
255 | 260 |
256 MessageHandler::MessageStatus MessageHandler::HandleNextMessage() { | 261 MessageHandler::MessageStatus MessageHandler::HandleNextMessage() { |
257 // We can only call HandleNextMessage when this handler is not | 262 // We can only call HandleNextMessage when this handler is not |
258 // assigned to a thread pool. | 263 // assigned to a thread pool. |
259 MonitorLocker ml(&monitor_); | 264 MonitorLocker ml(&monitor_); |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
368 status = HandleMessages(&ml, (status == kOK), true); | 373 status = HandleMessages(&ml, (status == kOK), true); |
369 } | 374 } |
370 } | 375 } |
371 | 376 |
372 // The isolate exits when it encounters an error or when it no | 377 // The isolate exits when it encounters an error or when it no |
373 // longer has live ports. | 378 // longer has live ports. |
374 if (status != kOK || !HasLivePorts()) { | 379 if (status != kOK || !HasLivePorts()) { |
375 if (ShouldPauseOnExit(status)) { | 380 if (ShouldPauseOnExit(status)) { |
376 if (!is_paused_on_exit()) { | 381 if (!is_paused_on_exit()) { |
377 if (FLAG_trace_service_pause_events) { | 382 if (FLAG_trace_service_pause_events) { |
378 OS::PrintErr("Isolate %s paused before exiting. " | 383 OS::PrintErr( |
379 "Use the Observatory to release it.\n", name()); | 384 "Isolate %s paused before exiting. " |
| 385 "Use the Observatory to release it.\n", |
| 386 name()); |
380 } | 387 } |
381 PausedOnExitLocked(&ml, true); | 388 PausedOnExitLocked(&ml, true); |
382 // More messages may have come in while we released the monitor. | 389 // More messages may have come in while we released the monitor. |
383 status = HandleMessages(&ml, false, false); | 390 status = HandleMessages(&ml, false, false); |
384 } | 391 } |
385 if (ShouldPauseOnExit(status)) { | 392 if (ShouldPauseOnExit(status)) { |
386 // Still paused. | 393 // Still paused. |
387 ASSERT(oob_queue_->IsEmpty()); | 394 ASSERT(oob_queue_->IsEmpty()); |
388 task_ = NULL; // No task in queue. | 395 task_ = NULL; // No task in queue. |
389 return; | 396 return; |
390 } else { | 397 } else { |
391 PausedOnExitLocked(&ml, false); | 398 PausedOnExitLocked(&ml, false); |
392 } | 399 } |
393 } | 400 } |
394 if (FLAG_trace_isolates) { | 401 if (FLAG_trace_isolates) { |
395 if (status != kOK && thread() != NULL) { | 402 if (status != kOK && thread() != NULL) { |
396 const Error& error = Error::Handle(thread()->sticky_error()); | 403 const Error& error = Error::Handle(thread()->sticky_error()); |
397 OS::Print("[-] Stopping message handler (%s):\n" | 404 OS::Print( |
398 "\thandler: %s\n" | 405 "[-] Stopping message handler (%s):\n" |
399 "\terror: %s\n", | 406 "\thandler: %s\n" |
400 MessageStatusString(status), name(), error.ToCString()); | 407 "\terror: %s\n", |
| 408 MessageStatusString(status), name(), error.ToCString()); |
401 } else { | 409 } else { |
402 OS::Print("[-] Stopping message handler (%s):\n" | 410 OS::Print( |
403 "\thandler: %s\n", | 411 "[-] Stopping message handler (%s):\n" |
404 MessageStatusString(status), name()); | 412 "\thandler: %s\n", |
| 413 MessageStatusString(status), name()); |
405 } | 414 } |
406 } | 415 } |
407 pool_ = NULL; | 416 pool_ = NULL; |
408 run_end_callback = true; | 417 run_end_callback = true; |
409 delete_me = delete_me_; | 418 delete_me = delete_me_; |
410 } | 419 } |
411 | 420 |
412 // Clear the task_ last. This allows other tasks to potentially start | 421 // Clear the task_ last. This allows other tasks to potentially start |
413 // for this message handler. | 422 // for this message handler. |
414 ASSERT(oob_queue_->IsEmpty()); | 423 ASSERT(oob_queue_->IsEmpty()); |
415 task_ = NULL; | 424 task_ = NULL; |
416 } | 425 } |
417 | 426 |
418 // Message handlers either use delete_me or end_callback but not both. | 427 // Message handlers either use delete_me or end_callback but not both. |
419 ASSERT(!delete_me || end_callback_ == NULL); | 428 ASSERT(!delete_me || end_callback_ == NULL); |
420 | 429 |
421 if (run_end_callback && end_callback_ != NULL) { | 430 if (run_end_callback && end_callback_ != NULL) { |
422 end_callback_(callback_data_); | 431 end_callback_(callback_data_); |
423 // The handler may have been deleted after this point. | 432 // The handler may have been deleted after this point. |
424 } | 433 } |
425 if (delete_me) { | 434 if (delete_me) { |
426 delete this; | 435 delete this; |
427 } | 436 } |
428 } | 437 } |
429 | 438 |
430 | 439 |
431 void MessageHandler::ClosePort(Dart_Port port) { | 440 void MessageHandler::ClosePort(Dart_Port port) { |
432 MonitorLocker ml(&monitor_); | 441 MonitorLocker ml(&monitor_); |
433 if (FLAG_trace_isolates) { | 442 if (FLAG_trace_isolates) { |
434 OS::Print("[-] Closing port:\n" | 443 OS::Print( |
435 "\thandler: %s\n" | 444 "[-] Closing port:\n" |
436 "\tport: %" Pd64 "\n" | 445 "\thandler: %s\n" |
437 "\tports: live(%" Pd ")\n", | 446 "\tport: %" Pd64 |
438 name(), port, live_ports_); | 447 "\n" |
| 448 "\tports: live(%" Pd ")\n", |
| 449 name(), port, live_ports_); |
439 } | 450 } |
440 } | 451 } |
441 | 452 |
442 | 453 |
443 void MessageHandler::CloseAllPorts() { | 454 void MessageHandler::CloseAllPorts() { |
444 MonitorLocker ml(&monitor_); | 455 MonitorLocker ml(&monitor_); |
445 if (FLAG_trace_isolates) { | 456 if (FLAG_trace_isolates) { |
446 OS::Print("[-] Closing all ports:\n" | 457 OS::Print( |
447 "\thandler: %s\n", | 458 "[-] Closing all ports:\n" |
448 name()); | 459 "\thandler: %s\n", |
| 460 name()); |
449 } | 461 } |
450 queue_->Clear(); | 462 queue_->Clear(); |
451 oob_queue_->Clear(); | 463 oob_queue_->Clear(); |
452 } | 464 } |
453 | 465 |
454 | 466 |
455 void MessageHandler::RequestDeletion() { | 467 void MessageHandler::RequestDeletion() { |
456 ASSERT(OwnedByPortMap()); | 468 ASSERT(OwnedByPortMap()); |
457 { | 469 { |
458 MonitorLocker ml(&monitor_); | 470 MonitorLocker ml(&monitor_); |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
565 handler_->oob_message_handling_allowed_ = false; | 577 handler_->oob_message_handling_allowed_ = false; |
566 } | 578 } |
567 | 579 |
568 | 580 |
569 MessageHandler::AcquiredQueues::~AcquiredQueues() { | 581 MessageHandler::AcquiredQueues::~AcquiredQueues() { |
570 ASSERT(handler_ != NULL); | 582 ASSERT(handler_ != NULL); |
571 handler_->oob_message_handling_allowed_ = true; | 583 handler_->oob_message_handling_allowed_ = true; |
572 } | 584 } |
573 | 585 |
574 } // namespace dart | 586 } // namespace dart |
OLD | NEW |