OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message_handler.h" | 5 #include "vm/message_handler.h" |
6 | 6 |
7 #include "vm/dart.h" | 7 #include "vm/dart.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
| 9 #include "vm/object.h" |
| 10 #include "vm/object_store.h" |
9 #include "vm/os.h" | 11 #include "vm/os.h" |
10 #include "vm/port.h" | 12 #include "vm/port.h" |
11 #include "vm/thread_interrupter.h" | 13 #include "vm/thread_interrupter.h" |
12 | 14 |
13 | 15 |
14 namespace dart { | 16 namespace dart { |
15 | 17 |
16 DECLARE_FLAG(bool, trace_isolates); | 18 DECLARE_FLAG(bool, trace_isolates); |
17 DECLARE_FLAG(bool, trace_service_pause_events); | 19 DECLARE_FLAG(bool, trace_service_pause_events); |
18 | 20 |
19 class MessageHandlerTask : public ThreadPool::Task { | 21 class MessageHandlerTask : public ThreadPool::Task { |
20 public: | 22 public: |
21 explicit MessageHandlerTask(MessageHandler* handler) | 23 explicit MessageHandlerTask(MessageHandler* handler) |
22 : handler_(handler) { | 24 : handler_(handler) { |
23 ASSERT(handler != NULL); | 25 ASSERT(handler != NULL); |
24 } | 26 } |
25 | 27 |
26 virtual void Run() { | 28 virtual void Run() { |
27 ASSERT(handler_ != NULL); | 29 ASSERT(handler_ != NULL); |
28 handler_->TaskCallback(); | 30 handler_->TaskCallback(); |
29 } | 31 } |
30 | 32 |
31 private: | 33 private: |
32 MessageHandler* handler_; | 34 MessageHandler* handler_; |
33 | 35 |
34 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); | 36 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
35 }; | 37 }; |
36 | 38 |
37 | 39 |
| 40 // static |
| 41 const char* MessageHandler::MessageStatusString(MessageStatus status) { |
| 42 switch (status) { |
| 43 case kOK: |
| 44 return "OK"; |
| 45 case kError: |
| 46 return "Error"; |
| 47 case kRestart: |
| 48 return "Restart"; |
| 49 case kShutdown: |
| 50 return "Shutdown"; |
| 51 default: |
| 52 UNREACHABLE(); |
| 53 return "Illegal"; |
| 54 } |
| 55 } |
| 56 |
| 57 |
38 MessageHandler::MessageHandler() | 58 MessageHandler::MessageHandler() |
39 : queue_(new MessageQueue()), | 59 : queue_(new MessageQueue()), |
40 oob_queue_(new MessageQueue()), | 60 oob_queue_(new MessageQueue()), |
41 oob_message_handling_allowed_(true), | 61 oob_message_handling_allowed_(true), |
42 live_ports_(0), | 62 live_ports_(0), |
43 paused_(0), | 63 paused_(0), |
44 pause_on_start_(false), | 64 pause_on_start_(false), |
45 pause_on_exit_(false), | 65 pause_on_exit_(false), |
46 paused_on_start_(false), | 66 paused_on_start_(false), |
47 paused_on_exit_(false), | 67 paused_on_exit_(false), |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { | 163 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
144 // TODO(turnidge): Add assert that monitor_ is held here. | 164 // TODO(turnidge): Add assert that monitor_ is held here. |
145 Message* message = oob_queue_->Dequeue(); | 165 Message* message = oob_queue_->Dequeue(); |
146 if ((message == NULL) && (min_priority < Message::kOOBPriority)) { | 166 if ((message == NULL) && (min_priority < Message::kOOBPriority)) { |
147 message = queue_->Dequeue(); | 167 message = queue_->Dequeue(); |
148 } | 168 } |
149 return message; | 169 return message; |
150 } | 170 } |
151 | 171 |
152 | 172 |
153 bool MessageHandler::HandleMessages(bool allow_normal_messages, | 173 void MessageHandler::ClearOOBQueue() { |
154 bool allow_multiple_normal_messages) { | 174 oob_queue_->Clear(); |
| 175 } |
| 176 |
| 177 |
| 178 MessageHandler::MessageStatus MessageHandler::HandleMessages( |
| 179 bool allow_normal_messages, |
| 180 bool allow_multiple_normal_messages) { |
| 181 // TODO(turnidge): Add assert that monitor_ is held here. |
| 182 |
155 // If isolate() returns NULL StartIsolateScope does nothing. | 183 // If isolate() returns NULL StartIsolateScope does nothing. |
156 StartIsolateScope start_isolate(isolate()); | 184 StartIsolateScope start_isolate(isolate()); |
157 | 185 |
158 // ThreadInterrupter may have gone to sleep while waiting for | 186 // ThreadInterrupter may have gone to sleep while waiting for |
159 // an isolate to start handling messages. | 187 // an isolate to start handling messages. |
160 ThreadInterrupter::WakeUp(); | 188 ThreadInterrupter::WakeUp(); |
161 | 189 |
162 // TODO(turnidge): Add assert that monitor_ is held here. | 190 MessageStatus max_status = kOK; |
163 bool result = true; | 191 Message::Priority min_priority = ((allow_normal_messages && !paused()) |
164 Message::Priority min_priority = (allow_normal_messages && !paused()) ? | 192 ? Message::kNormalPriority |
165 Message::kNormalPriority : Message::kOOBPriority; | 193 : Message::kOOBPriority); |
166 Message* message = DequeueMessage(min_priority); | 194 Message* message = DequeueMessage(min_priority); |
167 while (message != NULL) { | 195 while (message != NULL) { |
168 intptr_t message_len = message->len(); | 196 intptr_t message_len = message->len(); |
169 if (FLAG_trace_isolates) { | 197 if (FLAG_trace_isolates) { |
170 OS::Print("[<] Handling message:\n" | 198 OS::Print("[<] Handling message:\n" |
171 "\tlen: %" Pd "\n" | 199 "\tlen: %" Pd "\n" |
172 "\thandler: %s\n" | 200 "\thandler: %s\n" |
173 "\tport: %" Pd64 "\n", | 201 "\tport: %" Pd64 "\n", |
174 message_len, name(), message->dest_port()); | 202 message_len, name(), message->dest_port()); |
175 } | 203 } |
176 | 204 |
177 // Release the monitor_ temporarily while we handle the message. | 205 // Release the monitor_ temporarily while we handle the message. |
178 // The monitor was acquired in MessageHandler::TaskCallback(). | 206 // The monitor was acquired in MessageHandler::TaskCallback(). |
179 monitor_.Exit(); | 207 monitor_.Exit(); |
180 Message::Priority saved_priority = message->priority(); | 208 Message::Priority saved_priority = message->priority(); |
181 Dart_Port saved_dest_port = message->dest_port(); | 209 Dart_Port saved_dest_port = message->dest_port(); |
182 result = HandleMessage(message); | 210 MessageStatus status = HandleMessage(message); |
| 211 if (status > max_status) { |
| 212 max_status = status; |
| 213 } |
183 message = NULL; // May be deleted by now. | 214 message = NULL; // May be deleted by now. |
184 monitor_.Enter(); | 215 monitor_.Enter(); |
185 if (FLAG_trace_isolates) { | 216 if (FLAG_trace_isolates) { |
186 OS::Print("[.] Message handled:\n" | 217 OS::Print("[.] Message handled (%s):\n" |
187 "\tlen: %" Pd "\n" | 218 "\tlen: %" Pd "\n" |
188 "\thandler: %s\n" | 219 "\thandler: %s\n" |
189 "\tport: %" Pd64 "\n", | 220 "\tport: %" Pd64 "\n", |
| 221 MessageStatusString(status), |
190 message_len, name(), saved_dest_port); | 222 message_len, name(), saved_dest_port); |
191 } | 223 } |
192 if (!result) { | 224 // If we are shutting down, do not process any more messages. |
193 // If we hit an error, we're done processing messages. | 225 if (status == kShutdown) { |
| 226 ClearOOBQueue(); |
194 break; | 227 break; |
195 } | 228 } |
| 229 |
196 // Some callers want to process only one normal message and then quit. At | 230 // Some callers want to process only one normal message and then quit. At |
197 // the same time it is OK to process multiple OOB messages. | 231 // the same time it is OK to process multiple OOB messages. |
198 if ((saved_priority == Message::kNormalPriority) && | 232 if ((saved_priority == Message::kNormalPriority) && |
199 !allow_multiple_normal_messages) { | 233 !allow_multiple_normal_messages) { |
200 break; | 234 // We processed one normal message. Allow no more. |
| 235 allow_normal_messages = false; |
201 } | 236 } |
202 | 237 |
203 // Reevaluate the minimum allowable priority as the paused state might | 238 // Reevaluate the minimum allowable priority. The paused state |
204 // have changed as part of handling the message. | 239 // may have changed as part of handling the message. We may also |
205 min_priority = (allow_normal_messages && !paused()) ? | 240 // have encountered an error during message processsing. |
206 Message::kNormalPriority : Message::kOOBPriority; | 241 // |
| 242 // Even if we encounter an error, we still process pending OOB |
| 243 // messages so that we don't lose the message notification. |
| 244 min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) |
| 245 ? Message::kNormalPriority |
| 246 : Message::kOOBPriority); |
207 message = DequeueMessage(min_priority); | 247 message = DequeueMessage(min_priority); |
208 } | 248 } |
209 return result; | 249 return max_status; |
210 } | 250 } |
211 | 251 |
212 | 252 |
213 bool MessageHandler::HandleNextMessage() { | 253 MessageHandler::MessageStatus MessageHandler::HandleNextMessage() { |
214 // We can only call HandleNextMessage when this handler is not | 254 // We can only call HandleNextMessage when this handler is not |
215 // assigned to a thread pool. | 255 // assigned to a thread pool. |
216 MonitorLocker ml(&monitor_); | 256 MonitorLocker ml(&monitor_); |
217 ASSERT(pool_ == NULL); | 257 ASSERT(pool_ == NULL); |
218 #if defined(DEBUG) | 258 #if defined(DEBUG) |
219 CheckAccess(); | 259 CheckAccess(); |
220 #endif | 260 #endif |
221 return HandleMessages(true, false); | 261 return HandleMessages(true, false); |
222 } | 262 } |
223 | 263 |
224 | 264 |
225 bool MessageHandler::HandleOOBMessages() { | 265 MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() { |
226 if (!oob_message_handling_allowed_) { | 266 if (!oob_message_handling_allowed_) { |
227 return true; | 267 return kOK; |
228 } | 268 } |
229 MonitorLocker ml(&monitor_); | 269 MonitorLocker ml(&monitor_); |
230 #if defined(DEBUG) | 270 #if defined(DEBUG) |
231 CheckAccess(); | 271 CheckAccess(); |
232 #endif | 272 #endif |
233 return HandleMessages(false, false); | 273 return HandleMessages(false, false); |
234 } | 274 } |
235 | 275 |
236 | 276 |
237 bool MessageHandler::HasOOBMessages() { | 277 bool MessageHandler::HasOOBMessages() { |
238 MonitorLocker ml(&monitor_); | 278 MonitorLocker ml(&monitor_); |
239 return !oob_queue_->IsEmpty(); | 279 return !oob_queue_->IsEmpty(); |
240 } | 280 } |
241 | 281 |
242 | 282 |
| 283 static bool ShouldPause(MessageHandler::MessageStatus status) { |
| 284 // If we are restarting or shutting down, we do not want to honor |
| 285 // pause_on_start or pause_on_exit. |
| 286 return (status != MessageHandler::kRestart && |
| 287 status != MessageHandler::kShutdown); |
| 288 } |
| 289 |
| 290 |
243 void MessageHandler::TaskCallback() { | 291 void MessageHandler::TaskCallback() { |
244 ASSERT(Isolate::Current() == NULL); | 292 ASSERT(Isolate::Current() == NULL); |
245 bool ok = true; | 293 MessageStatus status = kOK; |
246 bool run_end_callback = false; | 294 bool run_end_callback = false; |
247 { | 295 { |
| 296 // We will occasionally release and reacquire this monitor in this |
| 297 // function. Whenever we reacquire the monitor we *must* process |
| 298 // all pending OOB messages, or we may miss a request for vm |
| 299 // shutdown. |
248 MonitorLocker ml(&monitor_); | 300 MonitorLocker ml(&monitor_); |
249 // Initialize the message handler by running its start function, | |
250 // if we have one. For an isolate, this will run the isolate's | |
251 // main() function. | |
252 if (pause_on_start()) { | 301 if (pause_on_start()) { |
253 if (!paused_on_start_) { | 302 if (!paused_on_start_) { |
254 // Temporarily drop the lock when calling out to NotifyPauseOnStart. | 303 // Temporarily release the monitor when calling out to |
255 // This avoids a dead lock that can occur when this message handler | 304 // NotifyPauseOnStart. This avoids a dead lock that can occur |
256 // tries to post a message while a message is being posted to it. | 305 // when this message handler tries to post a message while a |
| 306 // message is being posted to it. |
257 paused_on_start_ = true; | 307 paused_on_start_ = true; |
258 paused_timestamp_ = OS::GetCurrentTimeMillis(); | 308 paused_timestamp_ = OS::GetCurrentTimeMillis(); |
259 monitor_.Exit(); | 309 monitor_.Exit(); |
260 NotifyPauseOnStart(); | 310 NotifyPauseOnStart(); |
261 monitor_.Enter(); | 311 monitor_.Enter(); |
262 } | 312 } |
263 // More messages may have come in while we released monitor_. | 313 // More messages may have come in before we (re)acquired the monitor. |
264 HandleMessages(false, false); | 314 status = HandleMessages(false, false); |
265 if (pause_on_start()) { | 315 if (ShouldPause(status) && pause_on_start()) { |
266 // Still paused. | 316 // Still paused. |
267 ASSERT(oob_queue_->IsEmpty()); | 317 ASSERT(oob_queue_->IsEmpty()); |
268 task_ = NULL; // No task in queue. | 318 task_ = NULL; // No task in queue. |
269 return; | 319 return; |
270 } else { | 320 } else { |
271 paused_on_start_ = false; | 321 paused_on_start_ = false; |
272 paused_timestamp_ = -1; | 322 paused_timestamp_ = -1; |
273 } | 323 } |
274 } | 324 } |
275 | 325 |
276 if (start_callback_) { | 326 if (status == kOK) { |
277 // Release the monitor_ temporarily while we call the start callback. | 327 if (start_callback_) { |
278 // The monitor was acquired with the MonitorLocker above. | 328 // Initialize the message handler by running its start function, |
279 monitor_.Exit(); | 329 // if we have one. For an isolate, this will run the isolate's |
280 ok = start_callback_(callback_data_); | 330 // main() function. |
281 ASSERT(Isolate::Current() == NULL); | 331 // |
282 start_callback_ = NULL; | 332 // Release the monitor_ temporarily while we call the start callback. |
283 monitor_.Enter(); | 333 monitor_.Exit(); |
| 334 status = start_callback_(callback_data_); |
| 335 ASSERT(Isolate::Current() == NULL); |
| 336 start_callback_ = NULL; |
| 337 monitor_.Enter(); |
| 338 } |
| 339 |
| 340 // Handle any pending messages for this message handler. |
| 341 if (status != kShutdown) { |
| 342 status = HandleMessages((status == kOK), true); |
| 343 } |
284 } | 344 } |
285 | 345 |
286 // Handle any pending messages for this message handler. | 346 // The isolate exits when it encounters an error or when it no |
287 if (ok) { | 347 // longer has live ports. |
288 ok = HandleMessages(true, true); | 348 if (status != kOK || !HasLivePorts()) { |
289 } | 349 if (ShouldPause(status) && pause_on_exit()) { |
290 | |
291 if (!ok || !HasLivePorts()) { | |
292 if (pause_on_exit()) { | |
293 if (!paused_on_exit_) { | 350 if (!paused_on_exit_) { |
294 if (FLAG_trace_service_pause_events) { | 351 if (FLAG_trace_service_pause_events) { |
295 OS::PrintErr("Isolate %s paused before exiting. " | 352 OS::PrintErr("Isolate %s paused before exiting. " |
296 "Use the Observatory to release it.\n", name()); | 353 "Use the Observatory to release it.\n", name()); |
297 } | 354 } |
298 // Temporarily drop the lock when calling out to NotifyPauseOnExit. | 355 // Temporarily release the monitor when calling out to |
299 // This avoids a dead lock that can occur when this message handler | 356 // NotifyPauseOnExit. This avoids a dead lock that can |
300 // tries to post a message while a message is being posted to it. | 357 // occur when this message handler tries to post a message |
| 358 // while a message is being posted to it. |
301 paused_on_exit_ = true; | 359 paused_on_exit_ = true; |
302 paused_timestamp_ = OS::GetCurrentTimeMillis(); | 360 paused_timestamp_ = OS::GetCurrentTimeMillis(); |
303 monitor_.Exit(); | 361 monitor_.Exit(); |
304 NotifyPauseOnExit(); | 362 NotifyPauseOnExit(); |
305 monitor_.Enter(); | 363 monitor_.Enter(); |
| 364 |
| 365 // More messages may have come in while we released the monitor. |
| 366 HandleMessages(false, false); |
306 } | 367 } |
307 // More messages may have come in while we released monitor_. | 368 if (ShouldPause(status) && pause_on_exit()) { |
308 HandleMessages(false, false); | |
309 if (pause_on_exit()) { | |
310 // Still paused. | 369 // Still paused. |
311 ASSERT(oob_queue_->IsEmpty()); | 370 ASSERT(oob_queue_->IsEmpty()); |
312 task_ = NULL; // No task in queue. | 371 task_ = NULL; // No task in queue. |
313 return; | 372 return; |
314 } else { | 373 } else { |
315 paused_on_exit_ = false; | 374 paused_on_exit_ = false; |
316 paused_timestamp_ = -1; | 375 paused_timestamp_ = -1; |
317 } | 376 } |
318 } | 377 } |
319 if (FLAG_trace_isolates) { | 378 if (FLAG_trace_isolates) { |
320 OS::Print("[-] Stopping message handler (%s):\n" | 379 if (status != kOK && isolate() != NULL) { |
321 "\thandler: %s\n", | 380 const Error& error = |
322 (ok ? "no live ports" : "error"), | 381 Error::Handle(isolate()->object_store()->sticky_error()); |
323 name()); | 382 OS::Print("[-] Stopping message handler (%s):\n" |
| 383 "\thandler: %s\n" |
| 384 "\terror: %s\n", |
| 385 MessageStatusString(status), name(), error.ToCString()); |
| 386 } else { |
| 387 OS::Print("[-] Stopping message handler (%s):\n" |
| 388 "\thandler: %s\n", |
| 389 MessageStatusString(status), name()); |
| 390 } |
324 } | 391 } |
325 pool_ = NULL; | 392 pool_ = NULL; |
326 run_end_callback = true; | 393 run_end_callback = true; |
327 } | 394 } |
328 | 395 |
329 // Clear the task_ last. This allows other tasks to potentially start | 396 // Clear the task_ last. This allows other tasks to potentially start |
330 // for this message handler. | 397 // for this message handler. |
331 ASSERT(oob_queue_->IsEmpty()); | 398 ASSERT(oob_queue_->IsEmpty()); |
332 task_ = NULL; | 399 task_ = NULL; |
333 } | 400 } |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
408 | 475 |
409 | 476 |
410 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { | 477 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { |
411 ASSERT(acquired_queues != NULL); | 478 ASSERT(acquired_queues != NULL); |
412 // No double dipping. | 479 // No double dipping. |
413 ASSERT(acquired_queues->handler_ == NULL); | 480 ASSERT(acquired_queues->handler_ == NULL); |
414 acquired_queues->Reset(this); | 481 acquired_queues->Reset(this); |
415 } | 482 } |
416 | 483 |
417 } // namespace dart | 484 } // namespace dart |
OLD | NEW |