Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: runtime/vm/message_handler.cc

Issue 1371193005: VM restart + shutdown fixes (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: more code review Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/message_handler.h" 5 #include "vm/message_handler.h"
6 6
7 #include "vm/dart.h" 7 #include "vm/dart.h"
8 #include "vm/lockers.h" 8 #include "vm/lockers.h"
9 #include "vm/object.h"
10 #include "vm/object_store.h"
9 #include "vm/os.h" 11 #include "vm/os.h"
10 #include "vm/port.h" 12 #include "vm/port.h"
11 #include "vm/thread_interrupter.h" 13 #include "vm/thread_interrupter.h"
12 14
13 15
14 namespace dart { 16 namespace dart {
15 17
16 DECLARE_FLAG(bool, trace_isolates); 18 DECLARE_FLAG(bool, trace_isolates);
17 DECLARE_FLAG(bool, trace_service_pause_events); 19 DECLARE_FLAG(bool, trace_service_pause_events);
18 20
19 class MessageHandlerTask : public ThreadPool::Task { 21 class MessageHandlerTask : public ThreadPool::Task {
20 public: 22 public:
21 explicit MessageHandlerTask(MessageHandler* handler) 23 explicit MessageHandlerTask(MessageHandler* handler)
22 : handler_(handler) { 24 : handler_(handler) {
23 ASSERT(handler != NULL); 25 ASSERT(handler != NULL);
24 } 26 }
25 27
26 virtual void Run() { 28 virtual void Run() {
27 ASSERT(handler_ != NULL); 29 ASSERT(handler_ != NULL);
28 handler_->TaskCallback(); 30 handler_->TaskCallback();
29 } 31 }
30 32
31 private: 33 private:
32 MessageHandler* handler_; 34 MessageHandler* handler_;
33 35
34 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); 36 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
35 }; 37 };
36 38
37 39
40 // static
41 const char* MessageHandler::MessageStatusString(MessageStatus status) {
42 switch (status) {
43 case kOK:
44 return "OK";
45 case kError:
46 return "Error";
47 case kRestart:
48 return "Restart";
49 case kShutdown:
50 return "Shutdown";
51 default:
52 UNREACHABLE();
53 return "Illegal";
54 }
55 }
56
57
38 MessageHandler::MessageHandler() 58 MessageHandler::MessageHandler()
39 : queue_(new MessageQueue()), 59 : queue_(new MessageQueue()),
40 oob_queue_(new MessageQueue()), 60 oob_queue_(new MessageQueue()),
41 oob_message_handling_allowed_(true), 61 oob_message_handling_allowed_(true),
42 live_ports_(0), 62 live_ports_(0),
43 paused_(0), 63 paused_(0),
44 pause_on_start_(false), 64 pause_on_start_(false),
45 pause_on_exit_(false), 65 pause_on_exit_(false),
46 paused_on_start_(false), 66 paused_on_start_(false),
47 paused_on_exit_(false), 67 paused_on_exit_(false),
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { 163 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
144 // TODO(turnidge): Add assert that monitor_ is held here. 164 // TODO(turnidge): Add assert that monitor_ is held here.
145 Message* message = oob_queue_->Dequeue(); 165 Message* message = oob_queue_->Dequeue();
146 if ((message == NULL) && (min_priority < Message::kOOBPriority)) { 166 if ((message == NULL) && (min_priority < Message::kOOBPriority)) {
147 message = queue_->Dequeue(); 167 message = queue_->Dequeue();
148 } 168 }
149 return message; 169 return message;
150 } 170 }
151 171
152 172
153 bool MessageHandler::HandleMessages(bool allow_normal_messages, 173 void MessageHandler::ClearOOBQueue() {
154 bool allow_multiple_normal_messages) { 174 oob_queue_->Clear();
175 }
176
177
178 MessageHandler::MessageStatus MessageHandler::HandleMessages(
179 bool allow_normal_messages,
180 bool allow_multiple_normal_messages) {
181 // TODO(turnidge): Add assert that monitor_ is held here.
182
155 // If isolate() returns NULL StartIsolateScope does nothing. 183 // If isolate() returns NULL StartIsolateScope does nothing.
156 StartIsolateScope start_isolate(isolate()); 184 StartIsolateScope start_isolate(isolate());
157 185
158 // ThreadInterrupter may have gone to sleep while waiting for 186 // ThreadInterrupter may have gone to sleep while waiting for
159 // an isolate to start handling messages. 187 // an isolate to start handling messages.
160 ThreadInterrupter::WakeUp(); 188 ThreadInterrupter::WakeUp();
161 189
162 // TODO(turnidge): Add assert that monitor_ is held here. 190 MessageStatus max_status = kOK;
163 bool result = true; 191 Message::Priority min_priority = ((allow_normal_messages && !paused())
164 Message::Priority min_priority = (allow_normal_messages && !paused()) ? 192 ? Message::kNormalPriority
165 Message::kNormalPriority : Message::kOOBPriority; 193 : Message::kOOBPriority);
166 Message* message = DequeueMessage(min_priority); 194 Message* message = DequeueMessage(min_priority);
167 while (message != NULL) { 195 while (message != NULL) {
168 intptr_t message_len = message->len(); 196 intptr_t message_len = message->len();
169 if (FLAG_trace_isolates) { 197 if (FLAG_trace_isolates) {
170 OS::Print("[<] Handling message:\n" 198 OS::Print("[<] Handling message:\n"
171 "\tlen: %" Pd "\n" 199 "\tlen: %" Pd "\n"
172 "\thandler: %s\n" 200 "\thandler: %s\n"
173 "\tport: %" Pd64 "\n", 201 "\tport: %" Pd64 "\n",
174 message_len, name(), message->dest_port()); 202 message_len, name(), message->dest_port());
175 } 203 }
176 204
177 // Release the monitor_ temporarily while we handle the message. 205 // Release the monitor_ temporarily while we handle the message.
178 // The monitor was acquired in MessageHandler::TaskCallback(). 206 // The monitor was acquired in MessageHandler::TaskCallback().
179 monitor_.Exit(); 207 monitor_.Exit();
180 Message::Priority saved_priority = message->priority(); 208 Message::Priority saved_priority = message->priority();
181 Dart_Port saved_dest_port = message->dest_port(); 209 Dart_Port saved_dest_port = message->dest_port();
182 result = HandleMessage(message); 210 MessageStatus status = HandleMessage(message);
211 if (status > max_status) {
212 max_status = status;
213 }
183 message = NULL; // May be deleted by now. 214 message = NULL; // May be deleted by now.
184 monitor_.Enter(); 215 monitor_.Enter();
185 if (FLAG_trace_isolates) { 216 if (FLAG_trace_isolates) {
186 OS::Print("[.] Message handled:\n" 217 OS::Print("[.] Message handled (%s):\n"
187 "\tlen: %" Pd "\n" 218 "\tlen: %" Pd "\n"
188 "\thandler: %s\n" 219 "\thandler: %s\n"
189 "\tport: %" Pd64 "\n", 220 "\tport: %" Pd64 "\n",
221 MessageStatusString(status),
190 message_len, name(), saved_dest_port); 222 message_len, name(), saved_dest_port);
191 } 223 }
192 if (!result) { 224 // If we are shutting down, do not process any more messages.
193 // If we hit an error, we're done processing messages. 225 if (status == kShutdown) {
226 ClearOOBQueue();
194 break; 227 break;
195 } 228 }
229
196 // Some callers want to process only one normal message and then quit. At 230 // Some callers want to process only one normal message and then quit. At
197 // the same time it is OK to process multiple OOB messages. 231 // the same time it is OK to process multiple OOB messages.
198 if ((saved_priority == Message::kNormalPriority) && 232 if ((saved_priority == Message::kNormalPriority) &&
199 !allow_multiple_normal_messages) { 233 !allow_multiple_normal_messages) {
200 break; 234 // We processed one normal message. Allow no more.
235 allow_normal_messages = false;
201 } 236 }
202 237
203 // Reevaluate the minimum allowable priority as the paused state might 238 // Reevaluate the minimum allowable priority. The paused state
204 // have changed as part of handling the message. 239 // may have changed as part of handling the message. We may also
205 min_priority = (allow_normal_messages && !paused()) ? 240 // have encountered an error during message processsing.
206 Message::kNormalPriority : Message::kOOBPriority; 241 //
242 // Even if we encounter an error, we still process pending OOB
243 // messages so that we don't lose the message notification.
244 min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
245 ? Message::kNormalPriority
246 : Message::kOOBPriority);
207 message = DequeueMessage(min_priority); 247 message = DequeueMessage(min_priority);
208 } 248 }
209 return result; 249 return max_status;
210 } 250 }
211 251
212 252
213 bool MessageHandler::HandleNextMessage() { 253 MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
214 // We can only call HandleNextMessage when this handler is not 254 // We can only call HandleNextMessage when this handler is not
215 // assigned to a thread pool. 255 // assigned to a thread pool.
216 MonitorLocker ml(&monitor_); 256 MonitorLocker ml(&monitor_);
217 ASSERT(pool_ == NULL); 257 ASSERT(pool_ == NULL);
218 #if defined(DEBUG) 258 #if defined(DEBUG)
219 CheckAccess(); 259 CheckAccess();
220 #endif 260 #endif
221 return HandleMessages(true, false); 261 return HandleMessages(true, false);
222 } 262 }
223 263
224 264
225 bool MessageHandler::HandleOOBMessages() { 265 MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() {
226 if (!oob_message_handling_allowed_) { 266 if (!oob_message_handling_allowed_) {
227 return true; 267 return kOK;
228 } 268 }
229 MonitorLocker ml(&monitor_); 269 MonitorLocker ml(&monitor_);
230 #if defined(DEBUG) 270 #if defined(DEBUG)
231 CheckAccess(); 271 CheckAccess();
232 #endif 272 #endif
233 return HandleMessages(false, false); 273 return HandleMessages(false, false);
234 } 274 }
235 275
236 276
237 bool MessageHandler::HasOOBMessages() { 277 bool MessageHandler::HasOOBMessages() {
238 MonitorLocker ml(&monitor_); 278 MonitorLocker ml(&monitor_);
239 return !oob_queue_->IsEmpty(); 279 return !oob_queue_->IsEmpty();
240 } 280 }
241 281
242 282
283 static bool ShouldPause(MessageHandler::MessageStatus status) {
284 // If we are restarting or shutting down, we do not want to honor
285 // pause_on_start or pause_on_exit.
286 return (status != MessageHandler::kRestart &&
287 status != MessageHandler::kShutdown);
288 }
289
290
243 void MessageHandler::TaskCallback() { 291 void MessageHandler::TaskCallback() {
244 ASSERT(Isolate::Current() == NULL); 292 ASSERT(Isolate::Current() == NULL);
245 bool ok = true; 293 MessageStatus status = kOK;
246 bool run_end_callback = false; 294 bool run_end_callback = false;
247 { 295 {
296 // We will occasionally release and reacquire this monitor in this
297 // function. Whenever we reacquire the monitor we *must* process
298 // all pending OOB messages, or we may miss a request for vm
299 // shutdown.
248 MonitorLocker ml(&monitor_); 300 MonitorLocker ml(&monitor_);
249 // Initialize the message handler by running its start function,
250 // if we have one. For an isolate, this will run the isolate's
251 // main() function.
252 if (pause_on_start()) { 301 if (pause_on_start()) {
253 if (!paused_on_start_) { 302 if (!paused_on_start_) {
254 // Temporarily drop the lock when calling out to NotifyPauseOnStart. 303 // Temporarily release the monitor when calling out to
255 // This avoids a dead lock that can occur when this message handler 304 // NotifyPauseOnStart. This avoids a dead lock that can occur
256 // tries to post a message while a message is being posted to it. 305 // when this message handler tries to post a message while a
306 // message is being posted to it.
257 paused_on_start_ = true; 307 paused_on_start_ = true;
258 paused_timestamp_ = OS::GetCurrentTimeMillis(); 308 paused_timestamp_ = OS::GetCurrentTimeMillis();
259 monitor_.Exit(); 309 monitor_.Exit();
260 NotifyPauseOnStart(); 310 NotifyPauseOnStart();
261 monitor_.Enter(); 311 monitor_.Enter();
262 } 312 }
263 // More messages may have come in while we released monitor_. 313 // More messages may have come in before we (re)acquired the monitor.
264 HandleMessages(false, false); 314 status = HandleMessages(false, false);
265 if (pause_on_start()) { 315 if (ShouldPause(status) && pause_on_start()) {
266 // Still paused. 316 // Still paused.
267 ASSERT(oob_queue_->IsEmpty()); 317 ASSERT(oob_queue_->IsEmpty());
268 task_ = NULL; // No task in queue. 318 task_ = NULL; // No task in queue.
269 return; 319 return;
270 } else { 320 } else {
271 paused_on_start_ = false; 321 paused_on_start_ = false;
272 paused_timestamp_ = -1; 322 paused_timestamp_ = -1;
273 } 323 }
274 } 324 }
275 325
276 if (start_callback_) { 326 if (status == kOK) {
277 // Release the monitor_ temporarily while we call the start callback. 327 if (start_callback_) {
278 // The monitor was acquired with the MonitorLocker above. 328 // Initialize the message handler by running its start function,
279 monitor_.Exit(); 329 // if we have one. For an isolate, this will run the isolate's
280 ok = start_callback_(callback_data_); 330 // main() function.
281 ASSERT(Isolate::Current() == NULL); 331 //
282 start_callback_ = NULL; 332 // Release the monitor_ temporarily while we call the start callback.
283 monitor_.Enter(); 333 monitor_.Exit();
334 status = start_callback_(callback_data_);
335 ASSERT(Isolate::Current() == NULL);
336 start_callback_ = NULL;
337 monitor_.Enter();
338 }
339
340 // Handle any pending messages for this message handler.
341 if (status != kShutdown) {
342 status = HandleMessages((status == kOK), true);
343 }
284 } 344 }
285 345
286 // Handle any pending messages for this message handler. 346 // The isolate exits when it encounters an error or when it no
287 if (ok) { 347 // longer has live ports.
288 ok = HandleMessages(true, true); 348 if (status != kOK || !HasLivePorts()) {
289 } 349 if (ShouldPause(status) && pause_on_exit()) {
290
291 if (!ok || !HasLivePorts()) {
292 if (pause_on_exit()) {
293 if (!paused_on_exit_) { 350 if (!paused_on_exit_) {
294 if (FLAG_trace_service_pause_events) { 351 if (FLAG_trace_service_pause_events) {
295 OS::PrintErr("Isolate %s paused before exiting. " 352 OS::PrintErr("Isolate %s paused before exiting. "
296 "Use the Observatory to release it.\n", name()); 353 "Use the Observatory to release it.\n", name());
297 } 354 }
298 // Temporarily drop the lock when calling out to NotifyPauseOnExit. 355 // Temporarily release the monitor when calling out to
299 // This avoids a dead lock that can occur when this message handler 356 // NotifyPauseOnExit. This avoids a dead lock that can
300 // tries to post a message while a message is being posted to it. 357 // occur when this message handler tries to post a message
358 // while a message is being posted to it.
301 paused_on_exit_ = true; 359 paused_on_exit_ = true;
302 paused_timestamp_ = OS::GetCurrentTimeMillis(); 360 paused_timestamp_ = OS::GetCurrentTimeMillis();
303 monitor_.Exit(); 361 monitor_.Exit();
304 NotifyPauseOnExit(); 362 NotifyPauseOnExit();
305 monitor_.Enter(); 363 monitor_.Enter();
364
365 // More messages may have come in while we released the monitor.
366 HandleMessages(false, false);
306 } 367 }
307 // More messages may have come in while we released monitor_. 368 if (ShouldPause(status) && pause_on_exit()) {
308 HandleMessages(false, false);
309 if (pause_on_exit()) {
310 // Still paused. 369 // Still paused.
311 ASSERT(oob_queue_->IsEmpty()); 370 ASSERT(oob_queue_->IsEmpty());
312 task_ = NULL; // No task in queue. 371 task_ = NULL; // No task in queue.
313 return; 372 return;
314 } else { 373 } else {
315 paused_on_exit_ = false; 374 paused_on_exit_ = false;
316 paused_timestamp_ = -1; 375 paused_timestamp_ = -1;
317 } 376 }
318 } 377 }
319 if (FLAG_trace_isolates) { 378 if (FLAG_trace_isolates) {
320 OS::Print("[-] Stopping message handler (%s):\n" 379 if (status != kOK && isolate() != NULL) {
321 "\thandler: %s\n", 380 const Error& error =
322 (ok ? "no live ports" : "error"), 381 Error::Handle(isolate()->object_store()->sticky_error());
323 name()); 382 OS::Print("[-] Stopping message handler (%s):\n"
383 "\thandler: %s\n"
384 "\terror: %s\n",
385 MessageStatusString(status), name(), error.ToCString());
386 } else {
387 OS::Print("[-] Stopping message handler (%s):\n"
388 "\thandler: %s\n",
389 MessageStatusString(status), name());
390 }
324 } 391 }
325 pool_ = NULL; 392 pool_ = NULL;
326 run_end_callback = true; 393 run_end_callback = true;
327 } 394 }
328 395
329 // Clear the task_ last. This allows other tasks to potentially start 396 // Clear the task_ last. This allows other tasks to potentially start
330 // for this message handler. 397 // for this message handler.
331 ASSERT(oob_queue_->IsEmpty()); 398 ASSERT(oob_queue_->IsEmpty());
332 task_ = NULL; 399 task_ = NULL;
333 } 400 }
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
408 475
409 476
410 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { 477 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) {
411 ASSERT(acquired_queues != NULL); 478 ASSERT(acquired_queues != NULL);
412 // No double dipping. 479 // No double dipping.
413 ASSERT(acquired_queues->handler_ == NULL); 480 ASSERT(acquired_queues->handler_ == NULL);
414 acquired_queues->Reset(this); 481 acquired_queues->Reset(this);
415 } 482 }
416 483
417 } // namespace dart 484 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698