| Index: mojo/system/proxy_message_pipe_endpoint.cc
|
| diff --git a/mojo/system/proxy_message_pipe_endpoint.cc b/mojo/system/proxy_message_pipe_endpoint.cc
|
| index d45405ff0f40694641102d7a928db9366b143917..9cd6040651555103910de254f3f22bb02ed8fdd7 100644
|
| --- a/mojo/system/proxy_message_pipe_endpoint.cc
|
| +++ b/mojo/system/proxy_message_pipe_endpoint.cc
|
| @@ -55,47 +55,22 @@ void ProxyMessagePipeEndpoint::OnPeerClose() {
|
| MessageInTransit::Create(MessageInTransit::kTypeMessagePipe,
|
| MessageInTransit::kSubtypeMessagePipePeerClosed,
|
| NULL, 0);
|
| - if (CanEnqueueMessage(message, NULL) == MOJO_RESULT_OK) {
|
| - EnqueueMessage(message, NULL);
|
| - } else {
|
| - message->Destroy();
|
| - // TODO(vtl): Do something more sensible on error here?
|
| - LOG(WARNING) << "Failed to send peer closed control message";
|
| - }
|
| + EnqueueMessageInternal(message, NULL);
|
| }
|
|
|
| -MojoResult ProxyMessagePipeEndpoint::CanEnqueueMessage(
|
| - const MessageInTransit* /*message*/,
|
| - const std::vector<Dispatcher*>* dispatchers) {
|
| - // TODO(vtl): Support sending handles over OS pipes.
|
| - if (dispatchers) {
|
| - NOTIMPLEMENTED();
|
| - return MOJO_RESULT_UNIMPLEMENTED;
|
| - }
|
| - return MOJO_RESULT_OK;
|
| -}
|
| -
|
| -// Note: We may have to enqueue messages even when our (local) peer isn't open
|
| -// -- it may have been written to and closed immediately, before we were ready.
|
| -// This case is handled in |Run()| (which will call us).
|
| -void ProxyMessagePipeEndpoint::EnqueueMessage(
|
| +MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(
|
| MessageInTransit* message,
|
| - std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
|
| - DCHECK(is_open_);
|
| -
|
| - // TODO(vtl)
|
| - DCHECK(!dispatchers || dispatchers->empty());
|
| + const std::vector<Dispatcher*>* dispatchers) {
|
| + DCHECK(!dispatchers || !dispatchers->empty());
|
|
|
| - if (is_running()) {
|
| - message->set_source_id(local_id_);
|
| - message->set_destination_id(remote_id_);
|
| - // TODO(vtl): Figure out error handling here (where it's rather late) --
|
| - // maybe move whatever checks we can into |CanEnqueueMessage()|.
|
| - if (!channel_->WriteMessage(message))
|
| - LOG(WARNING) << "Failed to write message to channel";
|
| - } else {
|
| - paused_message_queue_.push_back(message);
|
| + MojoResult result = CanEnqueueDispatchers(dispatchers);
|
| + if (result != MOJO_RESULT_OK) {
|
| + message->Destroy();
|
| + return result;
|
| }
|
| +
|
| + EnqueueMessageInternal(message, dispatchers);
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
|
| @@ -124,21 +99,48 @@ void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
|
| AssertConsistentState();
|
|
|
| for (std::deque<MessageInTransit*>::iterator it =
|
| - paused_message_queue_.begin();
|
| - it != paused_message_queue_.end();
|
| - ++it) {
|
| - if (CanEnqueueMessage(*it, NULL) == MOJO_RESULT_OK) {
|
| - EnqueueMessage(*it, NULL);
|
| - } else {
|
| - (*it)->Destroy();
|
| - // TODO(vtl): Do something more sensible on error here?
|
| - LOG(WARNING) << "Failed to send message";
|
| - // TODO(vtl): Abort?
|
| - }
|
| - }
|
| + paused_message_queue_.begin(); it != paused_message_queue_.end();
|
| + ++it)
|
| + EnqueueMessageInternal(*it, NULL);
|
| paused_message_queue_.clear();
|
| }
|
|
|
| +MojoResult ProxyMessagePipeEndpoint::CanEnqueueDispatchers(
|
| + const std::vector<Dispatcher*>* dispatchers) {
|
| + // TODO(vtl): Support sending handles over OS pipes.
|
| + if (dispatchers) {
|
| + NOTIMPLEMENTED();
|
| + return MOJO_RESULT_UNIMPLEMENTED;
|
| + }
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +// Note: We may have to enqueue messages even when our (local) peer isn't open
|
| +// -- it may have been written to and closed immediately, before we were ready.
|
| +// This case is handled in |Run()| (which will call us).
|
| +void ProxyMessagePipeEndpoint::EnqueueMessageInternal(
|
| + MessageInTransit* message,
|
| + const std::vector<Dispatcher*>* dispatchers) {
|
| + DCHECK(is_open_);
|
| +
|
| + DCHECK(!dispatchers || !dispatchers->empty());
|
| + // TODO(vtl): We don't actually support sending dispatchers yet. We shouldn't
|
| + // get here due to other checks.
|
| + DCHECK(!dispatchers) << "Not yet implemented";
|
| +
|
| + if (is_running()) {
|
| + message->set_source_id(local_id_);
|
| + message->set_destination_id(remote_id_);
|
| + // If it fails at this point, the message gets dropped. (This is no
|
| + // different from any other in-transit errors.)
|
| + // Note: |WriteMessage()| will destroy the message even on failure.
|
| + if (!channel_->WriteMessage(message))
|
| + LOG(WARNING) << "Failed to write message to channel";
|
| + } else {
|
| + paused_message_queue_.push_back(message);
|
| + }
|
| +}
|
| +
|
| #ifndef NDEBUG
|
| void ProxyMessagePipeEndpoint::AssertConsistentState() const {
|
| if (is_attached()) {
|
|
|