Index: mojo/system/local_message_pipe_endpoint.cc |
diff --git a/mojo/system/local_message_pipe_endpoint.cc b/mojo/system/local_message_pipe_endpoint.cc |
index 2886a1d66537c6d189f6a54917e2ba346c6613bd..f95a2ece5ae20131438c8882c60fe26cba911871 100644 |
--- a/mojo/system/local_message_pipe_endpoint.cc |
+++ b/mojo/system/local_message_pipe_endpoint.cc |
@@ -7,11 +7,29 @@ |
#include <string.h> |
#include "base/logging.h" |
+#include "mojo/system/dispatcher.h" |
#include "mojo/system/message_in_transit.h" |
namespace mojo { |
namespace system { |
+LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry() |
+ : message(NULL) { |
+} |
+ |
+// See comment in header file. |
+LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry( |
+ const MessageQueueEntry& other) |
+ : message(NULL) { |
+ DCHECK(!other.message); |
+ DCHECK(other.dispatchers.empty()); |
+} |
+ |
+LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() { |
+ if (message) |
+ message->Destroy(); |
+} |
+ |
LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() |
: is_open_(true), |
is_peer_open_(true) { |
@@ -24,11 +42,6 @@ LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { |
void LocalMessagePipeEndpoint::Close() { |
DCHECK(is_open_); |
is_open_ = false; |
- for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin(); |
- it != message_queue_.end(); |
- ++it) { |
- (*it)->Destroy(); |
- } |
message_queue_.clear(); |
} |
@@ -51,26 +64,35 @@ bool LocalMessagePipeEndpoint::OnPeerClose() { |
return true; |
} |
-MojoResult LocalMessagePipeEndpoint::EnqueueMessage( |
- MessageInTransit* message, |
+MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage( |
+ const MessageInTransit* /*message*/, |
const std::vector<Dispatcher*>* dispatchers) { |
- DCHECK(is_open_); |
- DCHECK(is_peer_open_); |
- |
// TODO(vtl) |
if (dispatchers) { |
- message->Destroy(); |
+ NOTIMPLEMENTED(); |
return MOJO_RESULT_UNIMPLEMENTED; |
} |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void LocalMessagePipeEndpoint::EnqueueMessage( |
+ MessageInTransit* message, |
+ std::vector<scoped_refptr<Dispatcher> >* dispatchers) { |
+ DCHECK(is_open_); |
+ DCHECK(is_peer_open_); |
+ |
+ // TODO(vtl) |
+ DCHECK(!dispatchers || dispatchers->empty()); |
bool was_empty = message_queue_.empty(); |
- message_queue_.push_back(message); |
+ message_queue_.push_back(MessageQueueEntry()); |
+ message_queue_.back().message = message; |
+ if (dispatchers) |
+ message_queue_.back().dispatchers.swap(*dispatchers); |
if (was_empty) { |
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), |
SatisfiableFlags()); |
} |
- |
- return MOJO_RESULT_OK; |
} |
void LocalMessagePipeEndpoint::CancelAllWaiters() { |
@@ -87,8 +109,6 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage( |
DCHECK(is_open_); |
const uint32_t max_bytes = num_bytes ? *num_bytes : 0; |
- // TODO(vtl): We'll need this later: |
- // const uint32_t max_handles = num_handles ? *num_handles : 0; |
if (message_queue_.empty()) { |
return is_peer_open_ ? MOJO_RESULT_NOT_FOUND : |
@@ -98,7 +118,7 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage( |
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop |
// and release the lock immediately. |
bool not_enough_space = false; |
- MessageInTransit* const message = message_queue_.front(); |
+ MessageInTransit* const message = message_queue_.front().message; |
if (num_bytes) |
*num_bytes = message->data_size(); |
if (message->data_size() <= max_bytes) |
@@ -108,7 +128,6 @@ MojoResult LocalMessagePipeEndpoint::ReadMessage( |
if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { |
message_queue_.pop_front(); |
- message->Destroy(); |
// Now it's empty, thus no longer readable. |
if (message_queue_.empty()) { |