Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Side by Side Diff: mojo/system/proxy_message_pipe_endpoint.cc

Issue 240133005: Mojo: Make some attempts towards fixing remote message pipe closure. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/proxy_message_pipe_endpoint.h" 5 #include "mojo/system/proxy_message_pipe_endpoint.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "mojo/system/channel.h" 10 #include "mojo/system/channel.h"
11 #include "mojo/system/local_message_pipe_endpoint.h" 11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/message_pipe_dispatcher.h" 12 #include "mojo/system/message_pipe_dispatcher.h"
13 13
14 namespace mojo { 14 namespace mojo {
15 namespace system { 15 namespace system {
16 16
17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint() 17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
18 : local_id_(MessageInTransit::kInvalidEndpointId), 18 : local_id_(MessageInTransit::kInvalidEndpointId),
19 remote_id_(MessageInTransit::kInvalidEndpointId), 19 remote_id_(MessageInTransit::kInvalidEndpointId),
20 is_open_(true),
21 is_peer_open_(true) { 20 is_peer_open_(true) {
22 } 21 }
23 22
24 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint( 23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
25 LocalMessagePipeEndpoint* local_message_pipe_endpoint, 24 LocalMessagePipeEndpoint* local_message_pipe_endpoint,
26 bool is_peer_open) 25 bool is_peer_open)
27 : local_id_(MessageInTransit::kInvalidEndpointId), 26 : local_id_(MessageInTransit::kInvalidEndpointId),
28 remote_id_(MessageInTransit::kInvalidEndpointId), 27 remote_id_(MessageInTransit::kInvalidEndpointId),
29 is_open_(true),
30 is_peer_open_(is_peer_open), 28 is_peer_open_(is_peer_open),
31 paused_message_queue_(MessageInTransitQueue::PassContents(), 29 paused_message_queue_(MessageInTransitQueue::PassContents(),
32 local_message_pipe_endpoint->message_queue()) { 30 local_message_pipe_endpoint->message_queue()) {
33 local_message_pipe_endpoint->Close(); 31 local_message_pipe_endpoint->Close();
34 } 32 }
35 33
36 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() { 34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
37 DCHECK(!is_running()); 35 DCHECK(!is_running());
38 DCHECK(!is_attached()); 36 DCHECK(!is_attached());
39 AssertConsistentState(); 37 AssertConsistentState();
40 DCHECK(paused_message_queue_.IsEmpty()); 38 DCHECK(paused_message_queue_.IsEmpty());
41 } 39 }
42 40
43 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const { 41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
44 return kTypeProxy; 42 return kTypeProxy;
45 } 43 }
46 44
47 void ProxyMessagePipeEndpoint::Close() { 45 bool ProxyMessagePipeEndpoint::OnPeerClose() {
48 DCHECK(is_open_);
49 is_open_ = false;
50
51 DCHECK(is_attached());
52 channel_->DetachMessagePipeEndpoint(local_id_);
53 channel_ = NULL;
54 local_id_ = MessageInTransit::kInvalidEndpointId;
55 remote_id_ = MessageInTransit::kInvalidEndpointId;
56 paused_message_queue_.Clear();
57 }
58
59 void ProxyMessagePipeEndpoint::OnPeerClose() {
60 DCHECK(is_open_);
61 DCHECK(is_peer_open_); 46 DCHECK(is_peer_open_);
62 47
63 is_peer_open_ = false; 48 is_peer_open_ = false;
64 EnqueueMessage(make_scoped_ptr( 49
65 new MessageInTransit(MessageInTransit::kTypeMessagePipe, 50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
66 MessageInTransit::kSubtypeMessagePipePeerClosed, 51 if (!paused_message_queue_.IsEmpty())
67 0, 0, NULL))); 52 return true;
53
54 if (is_attached()) {
55 if (!is_running()) {
56 // If we're not running yet, we can't be destroyed yet, because we're
57 // still waiting for the "run" message from the other side.
58 return true;
59 }
60
61 Detach();
62 }
63
64 return false;
68 } 65 }
69 66
70 // Note: We may have to enqueue messages even when our (local) peer isn't open 67 // Note: We may have to enqueue messages even when our (local) peer isn't open
71 // -- it may have been written to and closed immediately, before we were ready. 68 // -- it may have been written to and closed immediately, before we were ready.
72 // This case is handled in |Run()| (which will call us). 69 // This case is handled in |Run()| (which will call us).
73 void ProxyMessagePipeEndpoint::EnqueueMessage( 70 void ProxyMessagePipeEndpoint::EnqueueMessage(
74 scoped_ptr<MessageInTransit> message) { 71 scoped_ptr<MessageInTransit> message) {
75 DCHECK(is_open_);
76
77 if (is_running()) { 72 if (is_running()) {
78 message->SerializeAndCloseDispatchers(channel_.get()); 73 message->SerializeAndCloseDispatchers(channel_.get());
79 74
80 message->set_source_id(local_id_); 75 message->set_source_id(local_id_);
81 message->set_destination_id(remote_id_); 76 message->set_destination_id(remote_id_);
82 if (!channel_->WriteMessage(message.Pass())) 77 if (!channel_->WriteMessage(message.Pass()))
83 LOG(WARNING) << "Failed to write message to channel"; 78 LOG(WARNING) << "Failed to write message to channel";
84 } else { 79 } else {
85 paused_message_queue_.AddMessage(message.Pass()); 80 paused_message_queue_.AddMessage(message.Pass());
86 } 81 }
87 } 82 }
88 83
89 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel, 84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
90 MessageInTransit::EndpointId local_id) { 85 MessageInTransit::EndpointId local_id) {
91 DCHECK(channel.get()); 86 DCHECK(channel.get());
92 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); 87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
93 88
94 DCHECK(!is_attached()); 89 DCHECK(!is_attached());
95 90
96 AssertConsistentState(); 91 AssertConsistentState();
97 channel_ = channel; 92 channel_ = channel;
98 local_id_ = local_id; 93 local_id_ = local_id;
99 AssertConsistentState(); 94 AssertConsistentState();
100 } 95 }
101 96
102 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) { 97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
103 // Assertions about arguments: 98 // Assertions about arguments:
104 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); 99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
105 100
106 // Assertions about current state: 101 // Assertions about current state:
107 DCHECK(is_attached()); 102 DCHECK(is_attached());
108 DCHECK(!is_running()); 103 DCHECK(!is_running());
109 104
110 AssertConsistentState(); 105 AssertConsistentState();
111 remote_id_ = remote_id; 106 remote_id_ = remote_id;
112 AssertConsistentState(); 107 AssertConsistentState();
113 108
114 while (!paused_message_queue_.IsEmpty()) 109 while (!paused_message_queue_.IsEmpty())
115 EnqueueMessage(paused_message_queue_.GetMessage()); 110 EnqueueMessage(paused_message_queue_.GetMessage());
111
112 if (is_peer_open_)
113 return true; // Stay alive.
114
115 // We were just waiting to die.
116 Detach();
117 return false;
118 }
119
120 void ProxyMessagePipeEndpoint::OnRemove() {
121 Detach();
122 }
123
124 void ProxyMessagePipeEndpoint::Detach() {
125 DCHECK(is_attached());
126
127 AssertConsistentState();
128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
129 channel_ = NULL;
130 local_id_ = MessageInTransit::kInvalidEndpointId;
131 remote_id_ = MessageInTransit::kInvalidEndpointId;
132 paused_message_queue_.Clear();
133 AssertConsistentState();
116 } 134 }
117 135
118 #ifndef NDEBUG 136 #ifndef NDEBUG
119 void ProxyMessagePipeEndpoint::AssertConsistentState() const { 137 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
120 if (is_attached()) { 138 if (is_attached()) {
121 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId); 139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
122 } else { // Not attached. 140 } else { // Not attached.
123 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId); 141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
124 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId); 142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
125 } 143 }
126 } 144 }
127 #endif 145 #endif
128 146
129 } // namespace system 147 } // namespace system
130 } // namespace mojo 148 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698