Index: runtime/vm/message_handler.cc |
diff --git a/runtime/vm/message_handler.cc b/runtime/vm/message_handler.cc |
index ef8a71ef1c16ace0d6526e3dd56936bf2958039f..47ab4b9db7d4ef49dde0dc97e56e774319076820 100644 |
--- a/runtime/vm/message_handler.cc |
+++ b/runtime/vm/message_handler.cc |
@@ -6,6 +6,8 @@ |
#include "vm/dart.h" |
#include "vm/lockers.h" |
+#include "vm/object.h" |
+#include "vm/object_store.h" |
#include "vm/os.h" |
#include "vm/port.h" |
#include "vm/thread_interrupter.h" |
@@ -35,6 +37,24 @@ class MessageHandlerTask : public ThreadPool::Task { |
}; |
+// static |
+const char* MessageHandler::MessageStatusString(MessageStatus status) { |
+ switch (status) { |
+ case kOK: |
+ return "OK"; |
+ case kError: |
+ return "Error"; |
+ case kRestart: |
+ return "Restart"; |
+ case kShutdown: |
+ return "Shutdown"; |
+ default: |
+ UNREACHABLE(); |
+ return "Illegal"; |
+ } |
+} |
+ |
+ |
MessageHandler::MessageHandler() |
: queue_(new MessageQueue()), |
oob_queue_(new MessageQueue()), |
@@ -150,8 +170,16 @@ Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
} |
-bool MessageHandler::HandleMessages(bool allow_normal_messages, |
- bool allow_multiple_normal_messages) { |
+void MessageHandler::ClearOOBQueue() { |
+ oob_queue_->Clear(); |
+} |
+ |
+ |
+MessageHandler::MessageStatus MessageHandler::HandleMessages( |
+ bool allow_normal_messages, |
+ bool allow_multiple_normal_messages) { |
+ // TODO(turnidge): Add assert that monitor_ is held here. |
+ |
// If isolate() returns NULL StartIsolateScope does nothing. |
StartIsolateScope start_isolate(isolate()); |
@@ -159,10 +187,10 @@ bool MessageHandler::HandleMessages(bool allow_normal_messages, |
// an isolate to start handling messages. |
ThreadInterrupter::WakeUp(); |
- // TODO(turnidge): Add assert that monitor_ is held here. |
- bool result = true; |
- Message::Priority min_priority = (allow_normal_messages && !paused()) ? |
- Message::kNormalPriority : Message::kOOBPriority; |
+ MessageStatus max_status = kOK; |
+ Message::Priority min_priority = ((allow_normal_messages && !paused()) |
+ ? Message::kNormalPriority |
+ : Message::kOOBPriority); |
Message* message = DequeueMessage(min_priority); |
while (message != NULL) { |
intptr_t message_len = message->len(); |
@@ -179,38 +207,50 @@ bool MessageHandler::HandleMessages(bool allow_normal_messages, |
monitor_.Exit(); |
Message::Priority saved_priority = message->priority(); |
Dart_Port saved_dest_port = message->dest_port(); |
- result = HandleMessage(message); |
+ MessageStatus status = HandleMessage(message); |
+ if (status > max_status) { |
+ max_status = status; |
+ } |
message = NULL; // May be deleted by now. |
monitor_.Enter(); |
if (FLAG_trace_isolates) { |
- OS::Print("[.] Message handled:\n" |
+ OS::Print("[.] Message handled (%s):\n" |
"\tlen: %" Pd "\n" |
"\thandler: %s\n" |
"\tport: %" Pd64 "\n", |
+ MessageStatusString(status), |
message_len, name(), saved_dest_port); |
} |
- if (!result) { |
- // If we hit an error, we're done processing messages. |
+ // If we are shutting down, do not process any more messages. |
+ if (status == kShutdown) { |
+ ClearOOBQueue(); |
break; |
} |
+ |
// Some callers want to process only one normal message and then quit. At |
// the same time it is OK to process multiple OOB messages. |
if ((saved_priority == Message::kNormalPriority) && |
!allow_multiple_normal_messages) { |
- break; |
+ // We processed one normal message. Allow no more. |
+ allow_normal_messages = false; |
} |
- // Reevaluate the minimum allowable priority as the paused state might |
- // have changed as part of handling the message. |
- min_priority = (allow_normal_messages && !paused()) ? |
- Message::kNormalPriority : Message::kOOBPriority; |
+ // Reevaluate the minimum allowable priority. The paused state |
+ // may have changed as part of handling the message. We may also |
+ // have encountered an error during message processsing. |
+ // |
+ // Even if we encounter an error, we still process pending OOB |
+ // messages so that we don't lose the message notification. |
+ min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) |
+ ? Message::kNormalPriority |
+ : Message::kOOBPriority); |
message = DequeueMessage(min_priority); |
} |
- return result; |
+ return max_status; |
} |
-bool MessageHandler::HandleNextMessage() { |
+MessageHandler::MessageStatus MessageHandler::HandleNextMessage() { |
// We can only call HandleNextMessage when this handler is not |
// assigned to a thread pool. |
MonitorLocker ml(&monitor_); |
@@ -222,9 +262,9 @@ bool MessageHandler::HandleNextMessage() { |
} |
-bool MessageHandler::HandleOOBMessages() { |
+MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() { |
if (!oob_message_handling_allowed_) { |
- return true; |
+ return kOK; |
} |
MonitorLocker ml(&monitor_); |
#if defined(DEBUG) |
@@ -240,29 +280,39 @@ bool MessageHandler::HasOOBMessages() { |
} |
+static bool ShouldPause(MessageHandler::MessageStatus status) { |
+ // If we are restarting or shutting down, we do not want to honor |
+ // pause_on_start or pause_on_exit. |
+ return (status != MessageHandler::kRestart && |
+ status != MessageHandler::kShutdown); |
+} |
+ |
+ |
void MessageHandler::TaskCallback() { |
ASSERT(Isolate::Current() == NULL); |
- bool ok = true; |
+ MessageStatus status = kOK; |
bool run_end_callback = false; |
{ |
+ // We will occasionally release and reacquire this monitor in this |
+ // function. Whenever we reacquire the monitor we *must* process |
+ // all pending OOB messages, or we may miss a request for vm |
+ // shutdown. |
MonitorLocker ml(&monitor_); |
- // Initialize the message handler by running its start function, |
- // if we have one. For an isolate, this will run the isolate's |
- // main() function. |
if (pause_on_start()) { |
if (!paused_on_start_) { |
- // Temporarily drop the lock when calling out to NotifyPauseOnStart. |
- // This avoids a dead lock that can occur when this message handler |
- // tries to post a message while a message is being posted to it. |
+ // Temporarily release the monitor when calling out to |
+ // NotifyPauseOnStart. This avoids a dead lock that can occur |
+ // when this message handler tries to post a message while a |
+ // message is being posted to it. |
paused_on_start_ = true; |
paused_timestamp_ = OS::GetCurrentTimeMillis(); |
monitor_.Exit(); |
NotifyPauseOnStart(); |
monitor_.Enter(); |
} |
- // More messages may have come in while we released monitor_. |
- HandleMessages(false, false); |
- if (pause_on_start()) { |
+ // More messages may have come in before we (re)acquired the monitor. |
+ status = HandleMessages(false, false); |
+ if (ShouldPause(status) && pause_on_start()) { |
// Still paused. |
ASSERT(oob_queue_->IsEmpty()); |
task_ = NULL; // No task in queue. |
@@ -273,40 +323,49 @@ void MessageHandler::TaskCallback() { |
} |
} |
- if (start_callback_) { |
- // Release the monitor_ temporarily while we call the start callback. |
- // The monitor was acquired with the MonitorLocker above. |
- monitor_.Exit(); |
- ok = start_callback_(callback_data_); |
- ASSERT(Isolate::Current() == NULL); |
- start_callback_ = NULL; |
- monitor_.Enter(); |
- } |
+ if (status == kOK) { |
+ if (start_callback_) { |
+ // Initialize the message handler by running its start function, |
+ // if we have one. For an isolate, this will run the isolate's |
+ // main() function. |
+ // |
+ // Release the monitor_ temporarily while we call the start callback. |
+ monitor_.Exit(); |
+ status = start_callback_(callback_data_); |
+ ASSERT(Isolate::Current() == NULL); |
+ start_callback_ = NULL; |
+ monitor_.Enter(); |
+ } |
- // Handle any pending messages for this message handler. |
- if (ok) { |
- ok = HandleMessages(true, true); |
+ // Handle any pending messages for this message handler. |
+ if (status != kShutdown) { |
+ status = HandleMessages((status == kOK), true); |
+ } |
} |
- if (!ok || !HasLivePorts()) { |
- if (pause_on_exit()) { |
+ // The isolate exits when it encounters an error or when it no |
+ // longer has live ports. |
+ if (status != kOK || !HasLivePorts()) { |
+ if (ShouldPause(status) && pause_on_exit()) { |
if (!paused_on_exit_) { |
if (FLAG_trace_service_pause_events) { |
OS::PrintErr("Isolate %s paused before exiting. " |
"Use the Observatory to release it.\n", name()); |
} |
- // Temporarily drop the lock when calling out to NotifyPauseOnExit. |
- // This avoids a dead lock that can occur when this message handler |
- // tries to post a message while a message is being posted to it. |
+ // Temporarily release the monitor when calling out to |
+ // NotifyPauseOnExit. This avoids a dead lock that can |
+ // occur when this message handler tries to post a message |
+ // while a message is being posted to it. |
paused_on_exit_ = true; |
paused_timestamp_ = OS::GetCurrentTimeMillis(); |
monitor_.Exit(); |
NotifyPauseOnExit(); |
monitor_.Enter(); |
+ |
+ // More messages may have come in while we released the monitor. |
+ HandleMessages(false, false); |
} |
- // More messages may have come in while we released monitor_. |
- HandleMessages(false, false); |
- if (pause_on_exit()) { |
+ if (ShouldPause(status) && pause_on_exit()) { |
// Still paused. |
ASSERT(oob_queue_->IsEmpty()); |
task_ = NULL; // No task in queue. |
@@ -317,10 +376,18 @@ void MessageHandler::TaskCallback() { |
} |
} |
if (FLAG_trace_isolates) { |
- OS::Print("[-] Stopping message handler (%s):\n" |
- "\thandler: %s\n", |
- (ok ? "no live ports" : "error"), |
- name()); |
+ if (status != kOK && isolate() != NULL) { |
+ const Error& error = |
+ Error::Handle(isolate()->object_store()->sticky_error()); |
+ OS::Print("[-] Stopping message handler (%s):\n" |
+ "\thandler: %s\n" |
+ "\terror: %s\n", |
+ MessageStatusString(status), name(), error.ToCString()); |
+ } else { |
+ OS::Print("[-] Stopping message handler (%s):\n" |
+ "\thandler: %s\n", |
+ MessageStatusString(status), name()); |
+ } |
} |
pool_ = NULL; |
run_end_callback = true; |