| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/proxy_message_pipe_endpoint.h" | 5 #include "mojo/system/proxy_message_pipe_endpoint.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/stl_util.h" | 10 #include "base/stl_util.h" |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 48 | 48 |
| 49 void ProxyMessagePipeEndpoint::OnPeerClose() { | 49 void ProxyMessagePipeEndpoint::OnPeerClose() { |
| 50 DCHECK(is_open_); | 50 DCHECK(is_open_); |
| 51 DCHECK(is_peer_open_); | 51 DCHECK(is_peer_open_); |
| 52 | 52 |
| 53 is_peer_open_ = false; | 53 is_peer_open_ = false; |
| 54 MessageInTransit* message = | 54 MessageInTransit* message = |
| 55 MessageInTransit::Create(MessageInTransit::kTypeMessagePipe, | 55 MessageInTransit::Create(MessageInTransit::kTypeMessagePipe, |
| 56 MessageInTransit::kSubtypeMessagePipePeerClosed, | 56 MessageInTransit::kSubtypeMessagePipePeerClosed, |
| 57 NULL, 0); | 57 NULL, 0); |
| 58 if (CanEnqueueMessage(message, NULL) == MOJO_RESULT_OK) { | 58 EnqueueMessageInternal(message, NULL); |
| 59 EnqueueMessage(message, NULL); | |
| 60 } else { | |
| 61 message->Destroy(); | |
| 62 // TODO(vtl): Do something more sensible on error here? | |
| 63 LOG(WARNING) << "Failed to send peer closed control message"; | |
| 64 } | |
| 65 } | 59 } |
| 66 | 60 |
| 67 MojoResult ProxyMessagePipeEndpoint::CanEnqueueMessage( | 61 MojoResult ProxyMessagePipeEndpoint::EnqueueMessage( |
| 68 const MessageInTransit* /*message*/, | 62 MessageInTransit* message, |
| 69 const std::vector<Dispatcher*>* dispatchers) { | 63 const std::vector<Dispatcher*>* dispatchers) { |
| 70 // TODO(vtl): Support sending handles over OS pipes. | 64 DCHECK(!dispatchers || !dispatchers->empty()); |
| 71 if (dispatchers) { | 65 |
| 72 NOTIMPLEMENTED(); | 66 MojoResult result = CanEnqueueDispatchers(dispatchers); |
| 73 return MOJO_RESULT_UNIMPLEMENTED; | 67 if (result != MOJO_RESULT_OK) { |
| 68 message->Destroy(); |
| 69 return result; |
| 74 } | 70 } |
| 71 |
| 72 EnqueueMessageInternal(message, dispatchers); |
| 75 return MOJO_RESULT_OK; | 73 return MOJO_RESULT_OK; |
| 76 } | 74 } |
| 77 | 75 |
| 78 // Note: We may have to enqueue messages even when our (local) peer isn't open | |
| 79 // -- it may have been written to and closed immediately, before we were ready. | |
| 80 // This case is handled in |Run()| (which will call us). | |
| 81 void ProxyMessagePipeEndpoint::EnqueueMessage( | |
| 82 MessageInTransit* message, | |
| 83 std::vector<scoped_refptr<Dispatcher> >* dispatchers) { | |
| 84 DCHECK(is_open_); | |
| 85 | |
| 86 // TODO(vtl) | |
| 87 DCHECK(!dispatchers || dispatchers->empty()); | |
| 88 | |
| 89 if (is_running()) { | |
| 90 message->set_source_id(local_id_); | |
| 91 message->set_destination_id(remote_id_); | |
| 92 // TODO(vtl): Figure out error handling here (where it's rather late) -- | |
| 93 // maybe move whatever checks we can into |CanEnqueueMessage()|. | |
| 94 if (!channel_->WriteMessage(message)) | |
| 95 LOG(WARNING) << "Failed to write message to channel"; | |
| 96 } else { | |
| 97 paused_message_queue_.push_back(message); | |
| 98 } | |
| 99 } | |
| 100 | |
| 101 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, | 76 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, |
| 102 MessageInTransit::EndpointId local_id) { | 77 MessageInTransit::EndpointId local_id) { |
| 103 DCHECK(channel.get()); | 78 DCHECK(channel.get()); |
| 104 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | 79 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
| 105 | 80 |
| 106 DCHECK(!is_attached()); | 81 DCHECK(!is_attached()); |
| 107 | 82 |
| 108 AssertConsistentState(); | 83 AssertConsistentState(); |
| 109 channel_ = channel; | 84 channel_ = channel; |
| 110 local_id_ = local_id; | 85 local_id_ = local_id; |
| 111 AssertConsistentState(); | 86 AssertConsistentState(); |
| 112 } | 87 } |
| 113 | 88 |
| 114 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { | 89 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { |
| 115 // Assertions about arguments: | 90 // Assertions about arguments: |
| 116 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | 91 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
| 117 | 92 |
| 118 // Assertions about current state: | 93 // Assertions about current state: |
| 119 DCHECK(is_attached()); | 94 DCHECK(is_attached()); |
| 120 DCHECK(!is_running()); | 95 DCHECK(!is_running()); |
| 121 | 96 |
| 122 AssertConsistentState(); | 97 AssertConsistentState(); |
| 123 remote_id_ = remote_id; | 98 remote_id_ = remote_id; |
| 124 AssertConsistentState(); | 99 AssertConsistentState(); |
| 125 | 100 |
| 126 for (std::deque<MessageInTransit*>::iterator it = | 101 for (std::deque<MessageInTransit*>::iterator it = |
| 127 paused_message_queue_.begin(); | 102 paused_message_queue_.begin(); it != paused_message_queue_.end(); |
| 128 it != paused_message_queue_.end(); | 103 ++it) |
| 129 ++it) { | 104 EnqueueMessageInternal(*it, NULL); |
| 130 if (CanEnqueueMessage(*it, NULL) == MOJO_RESULT_OK) { | 105 paused_message_queue_.clear(); |
| 131 EnqueueMessage(*it, NULL); | 106 } |
| 132 } else { | 107 |
| 133 (*it)->Destroy(); | 108 MojoResult ProxyMessagePipeEndpoint::CanEnqueueDispatchers( |
| 134 // TODO(vtl): Do something more sensible on error here? | 109 const std::vector<Dispatcher*>* dispatchers) { |
| 135 LOG(WARNING) << "Failed to send message"; | 110 // TODO(vtl): Support sending handles over OS pipes. |
| 136 // TODO(vtl): Abort? | 111 if (dispatchers) { |
| 137 } | 112 NOTIMPLEMENTED(); |
| 113 return MOJO_RESULT_UNIMPLEMENTED; |
| 138 } | 114 } |
| 139 paused_message_queue_.clear(); | 115 return MOJO_RESULT_OK; |
| 116 } |
| 117 |
| 118 // Note: We may have to enqueue messages even when our (local) peer isn't open |
| 119 // -- it may have been written to and closed immediately, before we were ready. |
| 120 // This case is handled in |Run()| (which will call us). |
| 121 void ProxyMessagePipeEndpoint::EnqueueMessageInternal( |
| 122 MessageInTransit* message, |
| 123 const std::vector<Dispatcher*>* dispatchers) { |
| 124 DCHECK(is_open_); |
| 125 |
| 126 DCHECK(!dispatchers || !dispatchers->empty()); |
| 127 // TODO(vtl): We don't actually support sending dispatchers yet. We shouldn't |
| 128 // get here due to other checks. |
| 129 DCHECK(!dispatchers) << "Not yet implemented"; |
| 130 |
| 131 if (is_running()) { |
| 132 message->set_source_id(local_id_); |
| 133 message->set_destination_id(remote_id_); |
| 134 // If it fails at this point, the message gets dropped. (This is no |
| 135 // different from any other in-transit errors.) |
| 136 // Note: |WriteMessage()| will destroy the message even on failure. |
| 137 if (!channel_->WriteMessage(message)) |
| 138 LOG(WARNING) << "Failed to write message to channel"; |
| 139 } else { |
| 140 paused_message_queue_.push_back(message); |
| 141 } |
| 140 } | 142 } |
| 141 | 143 |
| 142 #ifndef NDEBUG | 144 #ifndef NDEBUG |
| 143 void ProxyMessagePipeEndpoint::AssertConsistentState() const { | 145 void ProxyMessagePipeEndpoint::AssertConsistentState() const { |
| 144 if (is_attached()) { | 146 if (is_attached()) { |
| 145 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | 147 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); |
| 146 } else { // Not attached. | 148 } else { // Not attached. |
| 147 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | 149 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); |
| 148 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | 150 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); |
| 149 } | 151 } |
| 150 } | 152 } |
| 151 #endif | 153 #endif |
| 152 | 154 |
| 153 } // namespace system | 155 } // namespace system |
| 154 } // namespace mojo | 156 } // namespace mojo |
| OLD | NEW |