| OLD | NEW | 
|    1 // Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file |    1 // Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file | 
|    2 // for details. All rights reserved. Use of this source code is governed by a |    2 // for details. All rights reserved. Use of this source code is governed by a | 
|    3 // BSD-style license that can be found in the LICENSE file. |    3 // BSD-style license that can be found in the LICENSE file. | 
|    4  |    4  | 
|    5 #include "vm/message_handler.h" |    5 #include "vm/message_handler.h" | 
|    6  |    6  | 
|    7 #include "vm/dart.h" |    7 #include "vm/dart.h" | 
|    8 #include "vm/lockers.h" |    8 #include "vm/lockers.h" | 
 |    9 #include "vm/object.h" | 
 |   10 #include "vm/object_store.h" | 
|    9 #include "vm/os.h" |   11 #include "vm/os.h" | 
|   10 #include "vm/port.h" |   12 #include "vm/port.h" | 
|   11 #include "vm/thread_interrupter.h" |   13 #include "vm/thread_interrupter.h" | 
|   12  |   14  | 
|   13  |   15  | 
|   14 namespace dart { |   16 namespace dart { | 
|   15  |   17  | 
|   16 DECLARE_FLAG(bool, trace_isolates); |   18 DECLARE_FLAG(bool, trace_isolates); | 
|   17 DECLARE_FLAG(bool, trace_service_pause_events); |   19 DECLARE_FLAG(bool, trace_service_pause_events); | 
|   18  |   20  | 
|   19 class MessageHandlerTask : public ThreadPool::Task { |   21 class MessageHandlerTask : public ThreadPool::Task { | 
|   20  public: |   22  public: | 
|   21   explicit MessageHandlerTask(MessageHandler* handler) |   23   explicit MessageHandlerTask(MessageHandler* handler) | 
|   22       : handler_(handler) { |   24       : handler_(handler) { | 
|   23     ASSERT(handler != NULL); |   25     ASSERT(handler != NULL); | 
|   24   } |   26   } | 
|   25  |   27  | 
|   26   virtual void Run() { |   28   virtual void Run() { | 
|   27     ASSERT(handler_ != NULL); |   29     ASSERT(handler_ != NULL); | 
|   28     handler_->TaskCallback(); |   30     handler_->TaskCallback(); | 
|   29   } |   31   } | 
|   30  |   32  | 
|   31  private: |   33  private: | 
|   32   MessageHandler* handler_; |   34   MessageHandler* handler_; | 
|   33  |   35  | 
|   34   DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |   36   DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); | 
|   35 }; |   37 }; | 
|   36  |   38  | 
|   37  |   39  | 
 |   40 // static | 
 |   41 const char* MessageHandler::MessageStatusString(MessageStatus status) { | 
 |   42   switch (status) { | 
 |   43     case kOK: | 
 |   44       return "OK"; | 
 |   45     case kError: | 
 |   46       return "Error"; | 
 |   47     case kRestart: | 
 |   48       return "Restart"; | 
 |   49     case kShutdown: | 
 |   50       return "Shutdown"; | 
 |   51     default: | 
 |   52       UNREACHABLE(); | 
 |   53       return "Illegal"; | 
 |   54   } | 
 |   55 } | 
 |   56  | 
 |   57  | 
|   38 MessageHandler::MessageHandler() |   58 MessageHandler::MessageHandler() | 
|   39     : queue_(new MessageQueue()), |   59     : queue_(new MessageQueue()), | 
|   40       oob_queue_(new MessageQueue()), |   60       oob_queue_(new MessageQueue()), | 
|   41       oob_message_handling_allowed_(true), |   61       oob_message_handling_allowed_(true), | 
|   42       live_ports_(0), |   62       live_ports_(0), | 
|   43       paused_(0), |   63       paused_(0), | 
|   44       pause_on_start_(false), |   64       pause_on_start_(false), | 
|   45       pause_on_exit_(false), |   65       pause_on_exit_(false), | 
|   46       paused_on_start_(false), |   66       paused_on_start_(false), | 
|   47       paused_on_exit_(false), |   67       paused_on_exit_(false), | 
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  143 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |  163 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { | 
|  144   // TODO(turnidge): Add assert that monitor_ is held here. |  164   // TODO(turnidge): Add assert that monitor_ is held here. | 
|  145   Message* message = oob_queue_->Dequeue(); |  165   Message* message = oob_queue_->Dequeue(); | 
|  146   if ((message == NULL) && (min_priority < Message::kOOBPriority)) { |  166   if ((message == NULL) && (min_priority < Message::kOOBPriority)) { | 
|  147     message = queue_->Dequeue(); |  167     message = queue_->Dequeue(); | 
|  148   } |  168   } | 
|  149   return message; |  169   return message; | 
|  150 } |  170 } | 
|  151  |  171  | 
|  152  |  172  | 
|  153 bool MessageHandler::HandleMessages(bool allow_normal_messages, |  173 void MessageHandler::ClearOOBQueue() { | 
|  154                                     bool allow_multiple_normal_messages) { |  174   oob_queue_->Clear(); | 
 |  175 } | 
 |  176  | 
 |  177  | 
 |  178 MessageHandler::MessageStatus MessageHandler::HandleMessages( | 
 |  179     bool allow_normal_messages, | 
 |  180     bool allow_multiple_normal_messages) { | 
 |  181   // TODO(turnidge): Add assert that monitor_ is held here. | 
 |  182  | 
|  155   // If isolate() returns NULL StartIsolateScope does nothing. |  183   // If isolate() returns NULL StartIsolateScope does nothing. | 
|  156   StartIsolateScope start_isolate(isolate()); |  184   StartIsolateScope start_isolate(isolate()); | 
|  157  |  185  | 
|  158   // ThreadInterrupter may have gone to sleep while waiting for |  186   // ThreadInterrupter may have gone to sleep while waiting for | 
|  159   // an isolate to start handling messages. |  187   // an isolate to start handling messages. | 
|  160   ThreadInterrupter::WakeUp(); |  188   ThreadInterrupter::WakeUp(); | 
|  161  |  189  | 
|  162   // TODO(turnidge): Add assert that monitor_ is held here. |  190   MessageStatus max_status = kOK; | 
|  163   bool result = true; |  191   Message::Priority min_priority = ((allow_normal_messages && !paused()) | 
|  164   Message::Priority min_priority = (allow_normal_messages && !paused()) ? |  192                                     ? Message::kNormalPriority | 
|  165       Message::kNormalPriority : Message::kOOBPriority; |  193                                     : Message::kOOBPriority); | 
|  166   Message* message = DequeueMessage(min_priority); |  194   Message* message = DequeueMessage(min_priority); | 
|  167   while (message != NULL) { |  195   while (message != NULL) { | 
|  168     intptr_t message_len = message->len(); |  196     intptr_t message_len = message->len(); | 
|  169     if (FLAG_trace_isolates) { |  197     if (FLAG_trace_isolates) { | 
|  170       OS::Print("[<] Handling message:\n" |  198       OS::Print("[<] Handling message:\n" | 
|  171                 "\tlen:        %" Pd "\n" |  199                 "\tlen:        %" Pd "\n" | 
|  172                 "\thandler:    %s\n" |  200                 "\thandler:    %s\n" | 
|  173                 "\tport:       %" Pd64 "\n", |  201                 "\tport:       %" Pd64 "\n", | 
|  174                 message_len, name(), message->dest_port()); |  202                 message_len, name(), message->dest_port()); | 
|  175     } |  203     } | 
|  176  |  204  | 
|  177     // Release the monitor_ temporarily while we handle the message. |  205     // Release the monitor_ temporarily while we handle the message. | 
|  178     // The monitor was acquired in MessageHandler::TaskCallback(). |  206     // The monitor was acquired in MessageHandler::TaskCallback(). | 
|  179     monitor_.Exit(); |  207     monitor_.Exit(); | 
|  180     Message::Priority saved_priority = message->priority(); |  208     Message::Priority saved_priority = message->priority(); | 
|  181     Dart_Port saved_dest_port = message->dest_port(); |  209     Dart_Port saved_dest_port = message->dest_port(); | 
|  182     result = HandleMessage(message); |  210     MessageStatus status = HandleMessage(message); | 
 |  211     if (status > max_status) { | 
 |  212       max_status = status; | 
 |  213     } | 
|  183     message = NULL;  // May be deleted by now. |  214     message = NULL;  // May be deleted by now. | 
|  184     monitor_.Enter(); |  215     monitor_.Enter(); | 
|  185     if (FLAG_trace_isolates) { |  216     if (FLAG_trace_isolates) { | 
|  186       OS::Print("[.] Message handled:\n" |  217       OS::Print("[.] Message handled (%s):\n" | 
|  187                 "\tlen:        %" Pd "\n" |  218                 "\tlen:        %" Pd "\n" | 
|  188                 "\thandler:    %s\n" |  219                 "\thandler:    %s\n" | 
|  189                 "\tport:       %" Pd64 "\n", |  220                 "\tport:       %" Pd64 "\n", | 
 |  221                 MessageStatusString(status), | 
|  190                 message_len, name(), saved_dest_port); |  222                 message_len, name(), saved_dest_port); | 
|  191     } |  223     } | 
|  192     if (!result) { |  224     // If we are shutting down, do not process any more messages. | 
|  193       // If we hit an error, we're done processing messages. |  225     if (status == kShutdown) { | 
 |  226       ClearOOBQueue(); | 
|  194       break; |  227       break; | 
|  195     } |  228     } | 
 |  229  | 
|  196     // Some callers want to process only one normal message and then quit. At |  230     // Some callers want to process only one normal message and then quit. At | 
|  197     // the same time it is OK to process multiple OOB messages. |  231     // the same time it is OK to process multiple OOB messages. | 
|  198     if ((saved_priority == Message::kNormalPriority) && |  232     if ((saved_priority == Message::kNormalPriority) && | 
|  199         !allow_multiple_normal_messages) { |  233         !allow_multiple_normal_messages) { | 
|  200       break; |  234       // We processed one normal message.  Allow no more. | 
 |  235       allow_normal_messages = false; | 
|  201     } |  236     } | 
|  202  |  237  | 
|  203     // Reevaluate the minimum allowable priority as the paused state might |  238     // Reevaluate the minimum allowable priority.  The paused state | 
|  204     // have changed as part of handling the message. |  239     // may have changed as part of handling the message.  We may also | 
|  205     min_priority = (allow_normal_messages && !paused()) ? |  240     // have encountered an error during message processsing. | 
|  206         Message::kNormalPriority : Message::kOOBPriority; |  241     // | 
 |  242     // Even if we encounter an error, we still process pending OOB | 
 |  243     // messages so that we don't lose the message notification. | 
 |  244     min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) | 
 |  245                     ? Message::kNormalPriority | 
 |  246                     : Message::kOOBPriority); | 
|  207     message = DequeueMessage(min_priority); |  247     message = DequeueMessage(min_priority); | 
|  208   } |  248   } | 
|  209   return result; |  249   return max_status; | 
|  210 } |  250 } | 
|  211  |  251  | 
|  212  |  252  | 
|  213 bool MessageHandler::HandleNextMessage() { |  253 MessageHandler::MessageStatus MessageHandler::HandleNextMessage() { | 
|  214   // We can only call HandleNextMessage when this handler is not |  254   // We can only call HandleNextMessage when this handler is not | 
|  215   // assigned to a thread pool. |  255   // assigned to a thread pool. | 
|  216   MonitorLocker ml(&monitor_); |  256   MonitorLocker ml(&monitor_); | 
|  217   ASSERT(pool_ == NULL); |  257   ASSERT(pool_ == NULL); | 
|  218 #if defined(DEBUG) |  258 #if defined(DEBUG) | 
|  219   CheckAccess(); |  259   CheckAccess(); | 
|  220 #endif |  260 #endif | 
|  221   return HandleMessages(true, false); |  261   return HandleMessages(true, false); | 
|  222 } |  262 } | 
|  223  |  263  | 
|  224  |  264  | 
|  225 bool MessageHandler::HandleOOBMessages() { |  265 MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() { | 
|  226   if (!oob_message_handling_allowed_) { |  266   if (!oob_message_handling_allowed_) { | 
|  227     return true; |  267     return kOK; | 
|  228   } |  268   } | 
|  229   MonitorLocker ml(&monitor_); |  269   MonitorLocker ml(&monitor_); | 
|  230 #if defined(DEBUG) |  270 #if defined(DEBUG) | 
|  231   CheckAccess(); |  271   CheckAccess(); | 
|  232 #endif |  272 #endif | 
|  233   return HandleMessages(false, false); |  273   return HandleMessages(false, false); | 
|  234 } |  274 } | 
|  235  |  275  | 
|  236  |  276  | 
|  237 bool MessageHandler::HasOOBMessages() { |  277 bool MessageHandler::HasOOBMessages() { | 
|  238   MonitorLocker ml(&monitor_); |  278   MonitorLocker ml(&monitor_); | 
|  239   return !oob_queue_->IsEmpty(); |  279   return !oob_queue_->IsEmpty(); | 
|  240 } |  280 } | 
|  241  |  281  | 
|  242  |  282  | 
 |  283 static bool ShouldPause(MessageHandler::MessageStatus status) { | 
 |  284   // If we are restarting or shutting down, we do not want to honor | 
 |  285   // pause_on_start or pause_on_exit. | 
 |  286   return (status != MessageHandler::kRestart && | 
 |  287           status != MessageHandler::kShutdown); | 
 |  288 } | 
 |  289  | 
 |  290  | 
|  243 void MessageHandler::TaskCallback() { |  291 void MessageHandler::TaskCallback() { | 
|  244   ASSERT(Isolate::Current() == NULL); |  292   ASSERT(Isolate::Current() == NULL); | 
|  245   bool ok = true; |  293   MessageStatus status = kOK; | 
|  246   bool run_end_callback = false; |  294   bool run_end_callback = false; | 
|  247   { |  295   { | 
 |  296     // We will occasionally release and reacquire this monitor in this | 
 |  297     // function. Whenever we reacquire the monitor we *must* process | 
 |  298     // all pending OOB messages, or we may miss a request for vm | 
 |  299     // shutdown. | 
|  248     MonitorLocker ml(&monitor_); |  300     MonitorLocker ml(&monitor_); | 
|  249     // Initialize the message handler by running its start function, |  | 
|  250     // if we have one.  For an isolate, this will run the isolate's |  | 
|  251     // main() function. |  | 
|  252     if (pause_on_start()) { |  301     if (pause_on_start()) { | 
|  253       if (!paused_on_start_) { |  302       if (!paused_on_start_) { | 
|  254         // Temporarily drop the lock when calling out to NotifyPauseOnStart. |  303         // Temporarily release the monitor when calling out to | 
|  255         // This avoids a dead lock that can occur when this message handler |  304         // NotifyPauseOnStart.  This avoids a dead lock that can occur | 
|  256         // tries to post a message while a message is being posted to it. |  305         // when this message handler tries to post a message while a | 
 |  306         // message is being posted to it. | 
|  257         paused_on_start_ = true; |  307         paused_on_start_ = true; | 
|  258         paused_timestamp_ = OS::GetCurrentTimeMillis(); |  308         paused_timestamp_ = OS::GetCurrentTimeMillis(); | 
|  259         monitor_.Exit(); |  309         monitor_.Exit(); | 
|  260         NotifyPauseOnStart(); |  310         NotifyPauseOnStart(); | 
|  261         monitor_.Enter(); |  311         monitor_.Enter(); | 
|  262       } |  312       } | 
|  263       // More messages may have come in while we released monitor_. |  313       // More messages may have come in before we (re)acquired the monitor. | 
|  264       HandleMessages(false, false); |  314       status = HandleMessages(false, false); | 
|  265       if (pause_on_start()) { |  315       if (ShouldPause(status) && pause_on_start()) { | 
|  266         // Still paused. |  316         // Still paused. | 
|  267         ASSERT(oob_queue_->IsEmpty()); |  317         ASSERT(oob_queue_->IsEmpty()); | 
|  268         task_ = NULL;  // No task in queue. |  318         task_ = NULL;  // No task in queue. | 
|  269         return; |  319         return; | 
|  270       } else { |  320       } else { | 
|  271         paused_on_start_ = false; |  321         paused_on_start_ = false; | 
|  272         paused_timestamp_ = -1; |  322         paused_timestamp_ = -1; | 
|  273       } |  323       } | 
|  274     } |  324     } | 
|  275  |  325  | 
|  276     if (start_callback_) { |  326     if (status == kOK) { | 
|  277       // Release the monitor_ temporarily while we call the start callback. |  327       if (start_callback_) { | 
|  278       // The monitor was acquired with the MonitorLocker above. |  328         // Initialize the message handler by running its start function, | 
|  279       monitor_.Exit(); |  329         // if we have one.  For an isolate, this will run the isolate's | 
|  280       ok = start_callback_(callback_data_); |  330         // main() function. | 
|  281       ASSERT(Isolate::Current() == NULL); |  331         // | 
|  282       start_callback_ = NULL; |  332         // Release the monitor_ temporarily while we call the start callback. | 
|  283       monitor_.Enter(); |  333         monitor_.Exit(); | 
 |  334         status = start_callback_(callback_data_); | 
 |  335         ASSERT(Isolate::Current() == NULL); | 
 |  336         start_callback_ = NULL; | 
 |  337         monitor_.Enter(); | 
 |  338       } | 
 |  339  | 
 |  340       // Handle any pending messages for this message handler. | 
 |  341       if (status != kShutdown) { | 
 |  342         status = HandleMessages((status == kOK), true); | 
 |  343       } | 
|  284     } |  344     } | 
|  285  |  345  | 
|  286     // Handle any pending messages for this message handler. |  346     // The isolate exits when it encounters an error or when it no | 
|  287     if (ok) { |  347     // longer has live ports. | 
|  288       ok = HandleMessages(true, true); |  348     if (status != kOK || !HasLivePorts()) { | 
|  289     } |  349       if (ShouldPause(status) && pause_on_exit()) { | 
|  290  |  | 
|  291     if (!ok || !HasLivePorts()) { |  | 
|  292       if (pause_on_exit()) { |  | 
|  293         if (!paused_on_exit_) { |  350         if (!paused_on_exit_) { | 
|  294           if (FLAG_trace_service_pause_events) { |  351           if (FLAG_trace_service_pause_events) { | 
|  295             OS::PrintErr("Isolate %s paused before exiting. " |  352             OS::PrintErr("Isolate %s paused before exiting. " | 
|  296                          "Use the Observatory to release it.\n", name()); |  353                          "Use the Observatory to release it.\n", name()); | 
|  297           } |  354           } | 
|  298           // Temporarily drop the lock when calling out to NotifyPauseOnExit. |  355           // Temporarily release the monitor when calling out to | 
|  299           // This avoids a dead lock that can occur when this message handler |  356           // NotifyPauseOnExit.  This avoids a dead lock that can | 
|  300           // tries to post a message while a message is being posted to it. |  357           // occur when this message handler tries to post a message | 
 |  358           // while a message is being posted to it. | 
|  301           paused_on_exit_ = true; |  359           paused_on_exit_ = true; | 
|  302           paused_timestamp_ = OS::GetCurrentTimeMillis(); |  360           paused_timestamp_ = OS::GetCurrentTimeMillis(); | 
|  303           monitor_.Exit(); |  361           monitor_.Exit(); | 
|  304           NotifyPauseOnExit(); |  362           NotifyPauseOnExit(); | 
|  305           monitor_.Enter(); |  363           monitor_.Enter(); | 
 |  364  | 
 |  365           // More messages may have come in while we released the monitor. | 
 |  366           HandleMessages(false, false); | 
|  306         } |  367         } | 
|  307         // More messages may have come in while we released monitor_. |  368         if (ShouldPause(status) && pause_on_exit()) { | 
|  308         HandleMessages(false, false); |  | 
|  309         if (pause_on_exit()) { |  | 
|  310           // Still paused. |  369           // Still paused. | 
|  311           ASSERT(oob_queue_->IsEmpty()); |  370           ASSERT(oob_queue_->IsEmpty()); | 
|  312           task_ = NULL;  // No task in queue. |  371           task_ = NULL;  // No task in queue. | 
|  313           return; |  372           return; | 
|  314         } else { |  373         } else { | 
|  315           paused_on_exit_ = false; |  374           paused_on_exit_ = false; | 
|  316           paused_timestamp_ = -1; |  375           paused_timestamp_ = -1; | 
|  317         } |  376         } | 
|  318       } |  377       } | 
|  319       if (FLAG_trace_isolates) { |  378       if (FLAG_trace_isolates) { | 
|  320         OS::Print("[-] Stopping message handler (%s):\n" |  379         if (status != kOK && isolate() != NULL) { | 
|  321                   "\thandler:    %s\n", |  380           const Error& error = | 
|  322                   (ok ? "no live ports" : "error"), |  381               Error::Handle(isolate()->object_store()->sticky_error()); | 
|  323                   name()); |  382           OS::Print("[-] Stopping message handler (%s):\n" | 
 |  383                     "\thandler:    %s\n" | 
 |  384                     "\terror:    %s\n", | 
 |  385                     MessageStatusString(status), name(), error.ToCString()); | 
 |  386         } else { | 
 |  387           OS::Print("[-] Stopping message handler (%s):\n" | 
 |  388                     "\thandler:    %s\n", | 
 |  389                     MessageStatusString(status), name()); | 
 |  390         } | 
|  324       } |  391       } | 
|  325       pool_ = NULL; |  392       pool_ = NULL; | 
|  326       run_end_callback = true; |  393       run_end_callback = true; | 
|  327     } |  394     } | 
|  328  |  395  | 
|  329     // Clear the task_ last.  This allows other tasks to potentially start |  396     // Clear the task_ last.  This allows other tasks to potentially start | 
|  330     // for this message handler. |  397     // for this message handler. | 
|  331     ASSERT(oob_queue_->IsEmpty()); |  398     ASSERT(oob_queue_->IsEmpty()); | 
|  332     task_ = NULL; |  399     task_ = NULL; | 
|  333   } |  400   } | 
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
|  408  |  475  | 
|  409  |  476  | 
|  410 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { |  477 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { | 
|  411   ASSERT(acquired_queues != NULL); |  478   ASSERT(acquired_queues != NULL); | 
|  412   // No double dipping. |  479   // No double dipping. | 
|  413   ASSERT(acquired_queues->handler_ == NULL); |  480   ASSERT(acquired_queues->handler_ == NULL); | 
|  414   acquired_queues->Reset(this); |  481   acquired_queues->Reset(this); | 
|  415 } |  482 } | 
|  416  |  483  | 
|  417 }  // namespace dart |  484 }  // namespace dart | 
| OLD | NEW |