OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/proxy_message_pipe_endpoint.h" | 5 #include "mojo/system/proxy_message_pipe_endpoint.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "mojo/system/channel.h" | |
11 #include "mojo/system/channel_endpoint.h" | 10 #include "mojo/system/channel_endpoint.h" |
12 #include "mojo/system/local_message_pipe_endpoint.h" | 11 #include "mojo/system/local_message_pipe_endpoint.h" |
13 #include "mojo/system/message_pipe_dispatcher.h" | 12 #include "mojo/system/message_pipe_dispatcher.h" |
14 | 13 |
15 namespace mojo { | 14 namespace mojo { |
16 namespace system { | 15 namespace system { |
17 | 16 |
18 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() | 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() |
19 : local_id_(MessageInTransit::kInvalidEndpointId), | 18 : is_running_(false), is_peer_open_(true) { |
20 remote_id_(MessageInTransit::kInvalidEndpointId), | |
21 is_peer_open_(true) { | |
22 } | 19 } |
23 | 20 |
24 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( | 21 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( |
25 LocalMessagePipeEndpoint* local_message_pipe_endpoint, | 22 LocalMessagePipeEndpoint* local_message_pipe_endpoint, |
26 bool is_peer_open) | 23 bool is_peer_open) |
27 : local_id_(MessageInTransit::kInvalidEndpointId), | 24 : is_running_(false), |
28 remote_id_(MessageInTransit::kInvalidEndpointId), | |
29 is_peer_open_(is_peer_open), | 25 is_peer_open_(is_peer_open), |
30 paused_message_queue_(MessageInTransitQueue::PassContents(), | 26 paused_message_queue_(MessageInTransitQueue::PassContents(), |
31 local_message_pipe_endpoint->message_queue()) { | 27 local_message_pipe_endpoint->message_queue()) { |
32 local_message_pipe_endpoint->Close(); | 28 local_message_pipe_endpoint->Close(); |
33 } | 29 } |
34 | 30 |
35 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { | 31 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { |
36 DCHECK(!is_running()); | 32 DCHECK(!is_running()); |
37 DCHECK(!is_attached()); | 33 DCHECK(!is_attached()); |
38 AssertConsistentState(); | |
39 DCHECK(paused_message_queue_.IsEmpty()); | 34 DCHECK(paused_message_queue_.IsEmpty()); |
40 } | 35 } |
41 | 36 |
42 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { | 37 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { |
43 return kTypeProxy; | 38 return kTypeProxy; |
44 } | 39 } |
45 | 40 |
46 bool ProxyMessagePipeEndpoint::OnPeerClose() { | 41 bool ProxyMessagePipeEndpoint::OnPeerClose() { |
47 DCHECK(is_peer_open_); | 42 DCHECK(is_peer_open_); |
48 | 43 |
(...skipping 15 matching lines...) Expand all Loading... |
64 | 59 |
65 return false; | 60 return false; |
66 } | 61 } |
67 | 62 |
68 // Note: We may have to enqueue messages even when our (local) peer isn't open | 63 // Note: We may have to enqueue messages even when our (local) peer isn't open |
69 // -- it may have been written to and closed immediately, before we were ready. | 64 // -- it may have been written to and closed immediately, before we were ready. |
70 // This case is handled in |Run()| (which will call us). | 65 // This case is handled in |Run()| (which will call us). |
71 void ProxyMessagePipeEndpoint::EnqueueMessage( | 66 void ProxyMessagePipeEndpoint::EnqueueMessage( |
72 scoped_ptr<MessageInTransit> message) { | 67 scoped_ptr<MessageInTransit> message) { |
73 if (is_running()) { | 68 if (is_running()) { |
74 message->SerializeAndCloseDispatchers(channel_.get()); | 69 DCHECK(channel_endpoint_.get()); |
75 | 70 LOG_IF(WARNING, !channel_endpoint_->EnqueueMessage(message.Pass())) |
76 message->set_source_id(local_id_); | 71 << "Failed to write enqueue message to channel"; |
77 message->set_destination_id(remote_id_); | |
78 if (!channel_->WriteMessage(message.Pass())) | |
79 LOG(WARNING) << "Failed to write message to channel"; | |
80 } else { | 72 } else { |
81 paused_message_queue_.AddMessage(message.Pass()); | 73 paused_message_queue_.AddMessage(message.Pass()); |
82 } | 74 } |
83 } | 75 } |
84 | 76 |
85 void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint, | 77 void ProxyMessagePipeEndpoint::Attach(ChannelEndpoint* channel_endpoint) { |
86 Channel* channel, | |
87 MessageInTransit::EndpointId local_id) { | |
88 DCHECK(channel_endpoint); | 78 DCHECK(channel_endpoint); |
89 DCHECK(channel); | |
90 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | |
91 | |
92 DCHECK(!is_attached()); | 79 DCHECK(!is_attached()); |
93 | |
94 AssertConsistentState(); | |
95 channel_endpoint_ = channel_endpoint; | 80 channel_endpoint_ = channel_endpoint; |
96 channel_ = channel; | |
97 local_id_ = local_id; | |
98 AssertConsistentState(); | |
99 } | 81 } |
100 | 82 |
101 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { | 83 bool ProxyMessagePipeEndpoint::Run() { |
102 // Assertions about arguments: | |
103 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | |
104 | |
105 // Assertions about current state: | 84 // Assertions about current state: |
106 DCHECK(is_attached()); | 85 DCHECK(is_attached()); |
107 DCHECK(!is_running()); | 86 DCHECK(!is_running()); |
108 | 87 |
109 AssertConsistentState(); | 88 is_running_ = true; |
110 remote_id_ = remote_id; | |
111 AssertConsistentState(); | |
112 | 89 |
113 while (!paused_message_queue_.IsEmpty()) | 90 while (!paused_message_queue_.IsEmpty()) { |
114 EnqueueMessage(paused_message_queue_.GetMessage()); | 91 LOG_IF( |
| 92 WARNING, |
| 93 !channel_endpoint_->EnqueueMessage(paused_message_queue_.GetMessage())) |
| 94 << "Failed to write enqueue message to channel"; |
| 95 } |
115 | 96 |
116 if (is_peer_open_) | 97 if (is_peer_open_) |
117 return true; // Stay alive. | 98 return true; // Stay alive. |
118 | 99 |
119 // We were just waiting to die. | 100 // We were just waiting to die. |
120 Detach(); | 101 Detach(); |
121 return false; | 102 return false; |
122 } | 103 } |
123 | 104 |
124 void ProxyMessagePipeEndpoint::OnRemove() { | 105 void ProxyMessagePipeEndpoint::OnRemove() { |
125 Detach(); | 106 Detach(); |
126 } | 107 } |
127 | 108 |
128 void ProxyMessagePipeEndpoint::Detach() { | 109 void ProxyMessagePipeEndpoint::Detach() { |
129 DCHECK(is_attached()); | 110 DCHECK(is_attached()); |
130 | 111 |
131 AssertConsistentState(); | 112 channel_endpoint_->DetachFromMessagePipe(); |
132 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); | |
133 channel_ = NULL; | |
134 // TODO(vtl): Inform |channel_endpoint_| that we were detached. | |
135 channel_endpoint_ = NULL; | 113 channel_endpoint_ = NULL; |
136 local_id_ = MessageInTransit::kInvalidEndpointId; | 114 is_running_ = false; |
137 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
138 paused_message_queue_.Clear(); | 115 paused_message_queue_.Clear(); |
139 AssertConsistentState(); | |
140 } | 116 } |
141 | 117 |
142 #ifndef NDEBUG | |
143 void ProxyMessagePipeEndpoint::AssertConsistentState() const { | |
144 if (is_attached()) { | |
145 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | |
146 } else { // Not attached. | |
147 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | |
148 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | |
149 } | |
150 } | |
151 #endif | |
152 | |
153 } // namespace system | 118 } // namespace system |
154 } // namespace mojo | 119 } // namespace mojo |
OLD | NEW |