Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(292)

Side by Side Diff: mojo/system/message_pipe.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/system/message_pipe.h" 5 #include "mojo/system/message_pipe.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h" 10 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h" 11 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_endpoint.h" 12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
12 14
13 namespace mojo { 15 namespace mojo {
14 namespace system { 16 namespace system {
15 17
16 namespace {
17
18 unsigned DestinationPortFromSourcePort(unsigned port) {
19 DCHECK(port == 0 || port == 1);
20 return port ^ 1;
21 }
22
23 } // namespace
24
25 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, 18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
26 scoped_ptr<MessagePipeEndpoint> endpoint_1) { 19 scoped_ptr<MessagePipeEndpoint> endpoint_1) {
27 endpoints_[0].reset(endpoint_0.release()); 20 endpoints_[0].reset(endpoint_0.release());
28 endpoints_[1].reset(endpoint_1.release()); 21 endpoints_[1].reset(endpoint_1.release());
29 } 22 }
30 23
31 MessagePipe::MessagePipe() { 24 MessagePipe::MessagePipe() {
32 endpoints_[0].reset(new LocalMessagePipeEndpoint()); 25 endpoints_[0].reset(new LocalMessagePipeEndpoint());
33 endpoints_[1].reset(new LocalMessagePipeEndpoint()); 26 endpoints_[1].reset(new LocalMessagePipeEndpoint());
34 } 27 }
35 28
29 // static
30 unsigned MessagePipe::GetPeerPort(unsigned port) {
31 DCHECK(port == 0 || port == 1);
32 return port ^ 1;
33 }
34
36 void MessagePipe::CancelAllWaiters(unsigned port) { 35 void MessagePipe::CancelAllWaiters(unsigned port) {
37 DCHECK(port == 0 || port == 1); 36 DCHECK(port == 0 || port == 1);
38 37
39 base::AutoLock locker(lock_); 38 base::AutoLock locker(lock_);
40 DCHECK(endpoints_[port].get()); 39 DCHECK(endpoints_[port].get());
41 endpoints_[port]->CancelAllWaiters(); 40 endpoints_[port]->CancelAllWaiters();
42 } 41 }
43 42
44 void MessagePipe::Close(unsigned port) { 43 void MessagePipe::Close(unsigned port) {
45 DCHECK(port == 0 || port == 1); 44 DCHECK(port == 0 || port == 1);
46 45
47 unsigned destination_port = DestinationPortFromSourcePort(port); 46 unsigned destination_port = GetPeerPort(port);
48 47
49 base::AutoLock locker(lock_); 48 base::AutoLock locker(lock_);
50 DCHECK(endpoints_[port].get()); 49 DCHECK(endpoints_[port].get());
51 50
52 endpoints_[port]->Close(); 51 endpoints_[port]->Close();
53 if (endpoints_[destination_port].get()) 52 bool should_destroy_destination = endpoints_[destination_port].get() ?
54 endpoints_[destination_port]->OnPeerClose(); 53 !endpoints_[destination_port]->OnPeerClose() : false;
55 54
56 endpoints_[port].reset(); 55 endpoints_[port].reset();
56 if (should_destroy_destination) {
57 endpoints_[destination_port]->Close();
58 endpoints_[destination_port].reset();
59 }
57 } 60 }
58 61
62 // TODO(vtl): Support sending handles.
59 // TODO(vtl): Handle flags. 63 // TODO(vtl): Handle flags.
60 MojoResult MessagePipe::WriteMessage( 64 MojoResult MessagePipe::WriteMessage(
61 unsigned port, 65 unsigned port,
62 const void* bytes, uint32_t num_bytes, 66 const void* bytes, uint32_t num_bytes,
63 const MojoHandle* handles, uint32_t num_handles, 67 const MojoHandle* /*handles*/, uint32_t /*num_handles*/,
64 MojoWriteMessageFlags flags) { 68 MojoWriteMessageFlags flags) {
65 DCHECK(port == 0 || port == 1); 69 DCHECK(port == 0 || port == 1);
66 70 return EnqueueMessage(
67 unsigned destination_port = DestinationPortFromSourcePort(port); 71 GetPeerPort(port),
68 72 MessageInTransit::Create(
69 base::AutoLock locker(lock_); 73 MessageInTransit::kTypeMessagePipeEndpoint,
70 DCHECK(endpoints_[port].get()); 74 MessageInTransit::kSubtypeMessagePipeEndpointData,
71 75 bytes, num_bytes));
72 // The destination port need not be open, unlike the source port.
73 if (!endpoints_[destination_port].get())
74 return MOJO_RESULT_FAILED_PRECONDITION;
75
76 return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
77 handles, num_handles,
78 flags);
79 } 76 }
80 77
81 MojoResult MessagePipe::ReadMessage(unsigned port, 78 MojoResult MessagePipe::ReadMessage(unsigned port,
82 void* bytes, uint32_t* num_bytes, 79 void* bytes, uint32_t* num_bytes,
83 MojoHandle* handles, uint32_t* num_handles, 80 MojoHandle* handles, uint32_t* num_handles,
84 MojoReadMessageFlags flags) { 81 MojoReadMessageFlags flags) {
85 DCHECK(port == 0 || port == 1); 82 DCHECK(port == 0 || port == 1);
86 83
87 base::AutoLock locker(lock_); 84 base::AutoLock locker(lock_);
88 DCHECK(endpoints_[port].get()); 85 DCHECK(endpoints_[port].get());
(...skipping 17 matching lines...) Expand all
106 103
107 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { 104 void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
108 DCHECK(port == 0 || port == 1); 105 DCHECK(port == 0 || port == 1);
109 106
110 base::AutoLock locker(lock_); 107 base::AutoLock locker(lock_);
111 DCHECK(endpoints_[port].get()); 108 DCHECK(endpoints_[port].get());
112 109
113 endpoints_[port]->RemoveWaiter(waiter); 110 endpoints_[port]->RemoveWaiter(waiter);
114 } 111 }
115 112
113 MojoResult MessagePipe::EnqueueMessage(unsigned port,
114 MessageInTransit* message) {
115 DCHECK(port == 0 || port == 1);
116 DCHECK(message);
117
118 if (message->type() == MessageInTransit::kTypeMessagePipe)
119 return HandleControlMessage(port, message);
120
121 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
122
123 base::AutoLock locker(lock_);
124 DCHECK(endpoints_[GetPeerPort(port)].get());
125
126 // The destination port need not be open, unlike the source port.
127 if (!endpoints_[port].get()) {
128 message->Destroy();
129 return MOJO_RESULT_FAILED_PRECONDITION;
130 }
131
132 return endpoints_[port]->EnqueueMessage(message);
133 }
134
135 void MessagePipe::Attach(unsigned port,
136 scoped_refptr<Channel> channel,
137 MessageInTransit::EndpointId local_id) {
138 DCHECK(port == 0 || port == 1);
139 DCHECK(channel.get());
140 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
141
142 base::AutoLock locker(lock_);
143 DCHECK(endpoints_[port].get());
144
145 endpoints_[port]->Attach(channel, local_id);
146 }
147
148 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
149 DCHECK(port == 0 || port == 1);
150 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
151
152 base::AutoLock locker(lock_);
153 DCHECK(endpoints_[port].get());
154
155 if (!endpoints_[port]->Run(remote_id)) {
156 endpoints_[port]->Close();
157 endpoints_[port].reset();
158 }
159 }
160
116 MessagePipe::~MessagePipe() { 161 MessagePipe::~MessagePipe() {
117 // Owned by the dispatchers. The owning dispatchers should only release us via 162 // Owned by the dispatchers. The owning dispatchers should only release us via
118 // their |Close()| method, which should inform us of being closed via our 163 // their |Close()| method, which should inform us of being closed via our
119 // |Close()|. Thus these should already be null. 164 // |Close()|. Thus these should already be null.
120 DCHECK(!endpoints_[0].get()); 165 DCHECK(!endpoints_[0].get());
121 DCHECK(!endpoints_[1].get()); 166 DCHECK(!endpoints_[1].get());
122 } 167 }
123 168
169 MojoResult MessagePipe::HandleControlMessage(unsigned port,
170 MessageInTransit* message) {
171 DCHECK(port == 0 || port == 1);
172 DCHECK(message);
173 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
174
175 MojoResult rv = MOJO_RESULT_OK;
176 switch (message->subtype()) {
177 case MessageInTransit::kSubtypeMessagePipePeerClosed: {
178 unsigned source_port = GetPeerPort(port);
179
180 base::AutoLock locker(lock_);
181 DCHECK(endpoints_[source_port].get());
182
183 endpoints_[source_port]->Close();
184 if (endpoints_[port].get())
185 endpoints_[port]->OnPeerClose();
186
187 endpoints_[source_port].reset();
188 break;
189 }
190 default:
191 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
192 << message->subtype();
193 rv = MOJO_RESULT_UNKNOWN;
194 break;
195 }
196
197 message->Destroy();
198 return rv;
199 }
200
124 } // namespace system 201 } // namespace system
125 } // namespace mojo 202 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698