Chromium Code Reviews| Index: runtime/vm/message_queue.cc |
| =================================================================== |
| --- runtime/vm/message_queue.cc (revision 3210) |
| +++ runtime/vm/message_queue.cc (working copy) |
| @@ -6,93 +6,123 @@ |
| namespace dart { |
| +MessageQueue::MessageQueue() { |
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
| + head_[p] = NULL; |
| + tail_[p] = NULL; |
| + } |
| +} |
| + |
| + |
| MessageQueue::~MessageQueue() { |
| // Ensure that all pending messages have been released. |
| - ASSERT(head_ == NULL); |
| +#if defined(DEBUG) |
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
| + ASSERT(head_[p] == NULL); |
| + } |
| +#endif |
| } |
| -void MessageQueue::Enqueue(PortMessage* msg) { |
| +void MessageQueue::Enqueue(Message* msg) { |
| // TODO(turnidge): Can't use MonitorLocker here because |
| // MonitorLocker is a StackResource, which requires a current |
| // isolate. Should MonitorLocker really be a StackResource? |
| monitor_.Enter(); |
| + |
| + Message::Priority p = msg->priority(); |
| // Make sure messages are not reused. |
| ASSERT(msg->next_ == NULL); |
| - if (head_ == NULL) { |
| + if (head_[p] == NULL) { |
| // Only element in the queue. |
| - head_ = msg; |
| - tail_ = msg; |
| + head_[p] = msg; |
| + tail_[p] = msg; |
| // We only need to notify if the queue was empty. |
| monitor_.Notify(); |
| } else { |
| - ASSERT(tail_ != NULL); |
| + ASSERT(tail_[p] != NULL); |
| // Append at the tail. |
| - tail_->next_ = msg; |
| - tail_ = msg; |
| + tail_[p]->next_ = msg; |
| + tail_[p] = msg; |
| } |
| monitor_.Exit(); |
| } |
| -PortMessage* MessageQueue::Dequeue(int64_t millis) { |
| +Message* MessageQueue::DequeueNoWait() { |
|
Anton Muhin
2012/01/12 12:58:16
you don't lock on monitor_, is it ok?
turnidge
2012/01/12 19:01:55
It is not okay. Fixed.
|
| + // Look for the highest priority available message. |
| + for (int p = Message::kNumPriorities-1; p >= Message::kFirstPriority; p--) { |
| + Message* result = head_[p]; |
| + if (result != NULL) { |
| + head_[p] = result->next_; |
| + // The following update to tail_ is not strictly needed. |
|
Anton Muhin
2012/01/12 12:58:16
maybe move under DEBUG?
turnidge
2012/01/12 19:01:55
It's not expensive and it makes me feel happier.
|
| + if (head_[p] == NULL) { |
| + tail_[p] = NULL; |
| + } |
| +#if defined(DEBUG) |
| + result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
| +#endif // DEBUG |
| + return result; |
| + } |
| + } |
| + return NULL; |
| +} |
| + |
| + |
| +Message* MessageQueue::Dequeue(int64_t millis) { |
| ASSERT(millis >= 0); |
| MonitorLocker ml(&monitor_); |
| - PortMessage* result = head_; |
| + |
| + Message* result = DequeueNoWait(); |
| if (result == NULL) { |
| + // No message available at any priority. |
| ml.Wait(millis); |
| - result = head_; |
| + result = DequeueNoWait(); |
| } |
| - if (result != NULL) { |
| - head_ = result->next_; |
| - // The following update to tail_ is not strictly needed. |
| - if (head_ == NULL) { |
| - tail_ = NULL; |
| - } |
| -#if defined(DEBUG) |
| - result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
| -#endif // DEBUG |
| - } |
| return result; |
| } |
| void MessageQueue::Flush(Dart_Port port) { |
| MonitorLocker ml(&monitor_); |
| - PortMessage* cur = head_; |
| - PortMessage* prev = NULL; |
| - while (cur != NULL) { |
| - PortMessage* next = cur->next_; |
| - // If the message matches, then remove it from the queue and delete it. |
| - if (cur->dest_port() == port) { |
| - if (prev != NULL) { |
| - prev->next_ = next; |
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
| + Message* cur = head_[p]; |
| + Message* prev = NULL; |
| + while (cur != NULL) { |
| + Message* next = cur->next_; |
| + // If the message matches, then remove it from the queue and delete it. |
| + if (cur->dest_port() == port) { |
| + if (prev != NULL) { |
| + prev->next_ = next; |
| + } else { |
| + head_[p] = next; |
| + } |
| + delete cur; |
| } else { |
| - head_ = next; |
| + // Move prev forward. |
| + prev = cur; |
| } |
| - delete cur; |
| - } else { |
| - // Move prev forward. |
| - prev = cur; |
| + // Advance to the next message in the queue. |
| + cur = next; |
| } |
| - // Advance to the next message in the queue. |
| - cur = next; |
| + tail_[p] = prev; |
| } |
| - tail_ = prev; |
| } |
| void MessageQueue::FlushAll() { |
| MonitorLocker ml(&monitor_); |
| - PortMessage* cur = head_; |
| - head_ = NULL; |
| - tail_ = NULL; |
| - while (cur != NULL) { |
| - PortMessage* next = cur->next_; |
| - delete cur; |
| - cur = next; |
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
| + Message* cur = head_[p]; |
| + head_[p] = NULL; |
| + tail_[p] = NULL; |
| + while (cur != NULL) { |
| + Message* next = cur->next_; |
| + delete cur; |
| + cur = next; |
| + } |
| } |
| } |