OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/proxy_message_pipe_endpoint.h" | 5 #include "mojo/system/proxy_message_pipe_endpoint.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "mojo/system/channel.h" | 10 #include "mojo/system/channel.h" |
11 #include "mojo/system/local_message_pipe_endpoint.h" | 11 #include "mojo/system/local_message_pipe_endpoint.h" |
12 #include "mojo/system/message_pipe_dispatcher.h" | 12 #include "mojo/system/message_pipe_dispatcher.h" |
13 | 13 |
14 namespace mojo { | 14 namespace mojo { |
15 namespace system { | 15 namespace system { |
16 | 16 |
17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() | 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() |
18 : local_id_(MessageInTransit::kInvalidEndpointId), | 18 : local_id_(MessageInTransit::kInvalidEndpointId), |
19 remote_id_(MessageInTransit::kInvalidEndpointId), | 19 remote_id_(MessageInTransit::kInvalidEndpointId), |
20 is_open_(true), | |
21 is_peer_open_(true) { | 20 is_peer_open_(true) { |
22 } | 21 } |
23 | 22 |
24 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( | 23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( |
25 LocalMessagePipeEndpoint* local_message_pipe_endpoint, | 24 LocalMessagePipeEndpoint* local_message_pipe_endpoint, |
26 bool is_peer_open) | 25 bool is_peer_open) |
27 : local_id_(MessageInTransit::kInvalidEndpointId), | 26 : local_id_(MessageInTransit::kInvalidEndpointId), |
28 remote_id_(MessageInTransit::kInvalidEndpointId), | 27 remote_id_(MessageInTransit::kInvalidEndpointId), |
29 is_open_(true), | |
30 is_peer_open_(is_peer_open), | 28 is_peer_open_(is_peer_open), |
31 paused_message_queue_(MessageInTransitQueue::PassContents(), | 29 paused_message_queue_(MessageInTransitQueue::PassContents(), |
32 local_message_pipe_endpoint->message_queue()) { | 30 local_message_pipe_endpoint->message_queue()) { |
33 local_message_pipe_endpoint->Close(); | 31 local_message_pipe_endpoint->Close(); |
34 } | 32 } |
35 | 33 |
36 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { | 34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { |
37 DCHECK(!is_running()); | 35 DCHECK(!is_running()); |
38 DCHECK(!is_attached()); | 36 DCHECK(!is_attached()); |
39 AssertConsistentState(); | 37 AssertConsistentState(); |
40 DCHECK(paused_message_queue_.IsEmpty()); | 38 DCHECK(paused_message_queue_.IsEmpty()); |
41 } | 39 } |
42 | 40 |
43 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { | 41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { |
44 return kTypeProxy; | 42 return kTypeProxy; |
45 } | 43 } |
46 | 44 |
47 void ProxyMessagePipeEndpoint::Close() { | 45 bool ProxyMessagePipeEndpoint::OnPeerClose() { |
48 DCHECK(is_open_); | |
49 is_open_ = false; | |
50 | |
51 DCHECK(is_attached()); | |
52 channel_->DetachMessagePipeEndpoint(local_id_); | |
53 channel_ = NULL; | |
54 local_id_ = MessageInTransit::kInvalidEndpointId; | |
55 remote_id_ = MessageInTransit::kInvalidEndpointId; | |
56 paused_message_queue_.Clear(); | |
57 } | |
58 | |
59 void ProxyMessagePipeEndpoint::OnPeerClose() { | |
60 DCHECK(is_open_); | |
61 DCHECK(is_peer_open_); | 46 DCHECK(is_peer_open_); |
62 | 47 |
63 is_peer_open_ = false; | 48 is_peer_open_ = false; |
64 EnqueueMessage(make_scoped_ptr( | 49 |
65 new MessageInTransit(MessageInTransit::kTypeMessagePipe, | 50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet. |
66 MessageInTransit::kSubtypeMessagePipePeerClosed, | 51 if (!paused_message_queue_.IsEmpty()) |
67 0, 0, NULL))); | 52 return true; |
| 53 |
| 54 if (is_attached()) { |
| 55 if (!is_running()) { |
| 56 // If we're not running yet, we can't be destroyed yet, because we're |
| 57 // still waiting for the "run" message from the other side. |
| 58 return true; |
| 59 } |
| 60 |
| 61 Detach(); |
| 62 } |
| 63 |
| 64 return false; |
68 } | 65 } |
69 | 66 |
70 // Note: We may have to enqueue messages even when our (local) peer isn't open | 67 // Note: We may have to enqueue messages even when our (local) peer isn't open |
71 // -- it may have been written to and closed immediately, before we were ready. | 68 // -- it may have been written to and closed immediately, before we were ready. |
72 // This case is handled in |Run()| (which will call us). | 69 // This case is handled in |Run()| (which will call us). |
73 void ProxyMessagePipeEndpoint::EnqueueMessage( | 70 void ProxyMessagePipeEndpoint::EnqueueMessage( |
74 scoped_ptr<MessageInTransit> message) { | 71 scoped_ptr<MessageInTransit> message) { |
75 DCHECK(is_open_); | |
76 | |
77 if (is_running()) { | 72 if (is_running()) { |
78 message->SerializeAndCloseDispatchers(channel_.get()); | 73 message->SerializeAndCloseDispatchers(channel_.get()); |
79 | 74 |
80 message->set_source_id(local_id_); | 75 message->set_source_id(local_id_); |
81 message->set_destination_id(remote_id_); | 76 message->set_destination_id(remote_id_); |
82 if (!channel_->WriteMessage(message.Pass())) | 77 if (!channel_->WriteMessage(message.Pass())) |
83 LOG(WARNING) << "Failed to write message to channel"; | 78 LOG(WARNING) << "Failed to write message to channel"; |
84 } else { | 79 } else { |
85 paused_message_queue_.AddMessage(message.Pass()); | 80 paused_message_queue_.AddMessage(message.Pass()); |
86 } | 81 } |
87 } | 82 } |
88 | 83 |
89 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, | 84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, |
90 MessageInTransit::EndpointId local_id) { | 85 MessageInTransit::EndpointId local_id) { |
91 DCHECK(channel.get()); | 86 DCHECK(channel.get()); |
92 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); | 87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
93 | 88 |
94 DCHECK(!is_attached()); | 89 DCHECK(!is_attached()); |
95 | 90 |
96 AssertConsistentState(); | 91 AssertConsistentState(); |
97 channel_ = channel; | 92 channel_ = channel; |
98 local_id_ = local_id; | 93 local_id_ = local_id; |
99 AssertConsistentState(); | 94 AssertConsistentState(); |
100 } | 95 } |
101 | 96 |
102 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { | 97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { |
103 // Assertions about arguments: | 98 // Assertions about arguments: |
104 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); | 99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
105 | 100 |
106 // Assertions about current state: | 101 // Assertions about current state: |
107 DCHECK(is_attached()); | 102 DCHECK(is_attached()); |
108 DCHECK(!is_running()); | 103 DCHECK(!is_running()); |
109 | 104 |
110 AssertConsistentState(); | 105 AssertConsistentState(); |
111 remote_id_ = remote_id; | 106 remote_id_ = remote_id; |
112 AssertConsistentState(); | 107 AssertConsistentState(); |
113 | 108 |
114 while (!paused_message_queue_.IsEmpty()) | 109 while (!paused_message_queue_.IsEmpty()) |
115 EnqueueMessage(paused_message_queue_.GetMessage()); | 110 EnqueueMessage(paused_message_queue_.GetMessage()); |
| 111 |
| 112 if (is_peer_open_) |
| 113 return true; // Stay alive. |
| 114 |
| 115 // We were just waiting to die. |
| 116 Detach(); |
| 117 return false; |
| 118 } |
| 119 |
| 120 void ProxyMessagePipeEndpoint::OnRemove() { |
| 121 Detach(); |
| 122 } |
| 123 |
| 124 void ProxyMessagePipeEndpoint::Detach() { |
| 125 DCHECK(is_attached()); |
| 126 |
| 127 AssertConsistentState(); |
| 128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_); |
| 129 channel_ = NULL; |
| 130 local_id_ = MessageInTransit::kInvalidEndpointId; |
| 131 remote_id_ = MessageInTransit::kInvalidEndpointId; |
| 132 paused_message_queue_.Clear(); |
| 133 AssertConsistentState(); |
116 } | 134 } |
117 | 135 |
118 #ifndef NDEBUG | 136 #ifndef NDEBUG |
119 void ProxyMessagePipeEndpoint::AssertConsistentState() const { | 137 void ProxyMessagePipeEndpoint::AssertConsistentState() const { |
120 if (is_attached()) { | 138 if (is_attached()) { |
121 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); | 139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); |
122 } else { // Not attached. | 140 } else { // Not attached. |
123 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); | 141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); |
124 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); | 142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); |
125 } | 143 } |
126 } | 144 } |
127 #endif | 145 #endif |
128 | 146 |
129 } // namespace system | 147 } // namespace system |
130 } // namespace mojo | 148 } // namespace mojo |
OLD | NEW |