Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more cleanup Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/data_pipe.h"
13
14 namespace mojo {
15 namespace edk {
16
17 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) {
18 if (message_pipe.is_valid()) {
19 channel_ = RawChannel::Create(message_pipe.Pass());
20 internal::g_io_thread_task_runner->PostTask(
21 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
22 }
23 }
24
25 void DataPipeProducerDispatcher::InitOnIO() {
26 base::AutoLock locker(lock());
27 if (channel_)
28 channel_->Init(this);
29 }
30
31 void DataPipeProducerDispatcher::CloseOnIO() {
32 base::AutoLock locker(lock());
33 if (channel_) {
34 channel_->Shutdown();
35 channel_ = nullptr;
36 }
37 }
38
39 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
40 return Type::DATA_PIPE_PRODUCER;
41 }
42
43 scoped_refptr<DataPipeProducerDispatcher>
44 DataPipeProducerDispatcher::Deserialize(
45 const void* source,
46 size_t size,
47 PlatformHandleVector* platform_handles) {
48 MojoCreateDataPipeOptions options;
49 ScopedPlatformHandle platform_handle =
50 DataPipe::Deserialize(source, size, platform_handles, &options,
51 nullptr, 0);
52
53 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
54 if (platform_handle.is_valid())
55 rv->Init(platform_handle.Pass());
56 return rv;
57 }
58
59 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
60 const MojoCreateDataPipeOptions& options)
61 : options_(options), channel_(nullptr), error_(false) {
62 }
63
64 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
65 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
66 DCHECK(!channel_);
67 }
68
69 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
70 lock().AssertAcquired();
71 awakable_list_.CancelAll();
72 }
73
74 void DataPipeProducerDispatcher::CloseImplNoLock() {
75 lock().AssertAcquired();
76 internal::g_io_thread_task_runner->PostTask(
77 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
78 }
79
80 scoped_refptr<Dispatcher>
81 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
82 lock().AssertAcquired();
83
84 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
85 rv->channel_ = channel_;
86 channel_ = nullptr;
87 rv->options_ = options_;
88 return scoped_refptr<Dispatcher>(rv.get());
89 }
90
91 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
92 const void* elements,
93 uint32_t* num_bytes,
94 MojoWriteDataFlags flags) {
95 lock().AssertAcquired();
96 if (InTwoPhaseWrite())
97 return MOJO_RESULT_BUSY;
98 if (error_)
99 return MOJO_RESULT_FAILED_PRECONDITION;
100 if (*num_bytes % options_.element_num_bytes != 0)
101 return MOJO_RESULT_INVALID_ARGUMENT;
102 if (*num_bytes == 0)
103 return MOJO_RESULT_OK; // Nothing to do.
104
105 // For now, we ignore options.capacity_num_bytes as a total of all pending
106 // writes (and just treat it per message). We will implement that later if
107 // we need to. All current uses want all their data to be sent, and it's not
108 // clear that this backpressure should be done at the mojo layer or at a
109 // higher application layer.
110 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
111 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
112 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
113 // Don't return "should wait" since you can't wait for a specified amount of
114 // data.
115 return MOJO_RESULT_OUT_OF_RANGE;
116 }
117
118 uint32_t num_bytes_to_write =
119 std::min(*num_bytes, options_.capacity_num_bytes);
120 if (num_bytes_to_write == 0)
121 return MOJO_RESULT_SHOULD_WAIT;
122
123 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
124
125 *num_bytes = num_bytes_to_write;
126 WriteDataIntoMessages(elements, num_bytes_to_write);
127
128 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
129 if (!new_state.equals(old_state))
130 awakable_list_.AwakeForStateChange(new_state);
131 return MOJO_RESULT_OK;
132 }
133
134 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
135 void** buffer,
136 uint32_t* buffer_num_bytes,
137 MojoWriteDataFlags flags) {
138 lock().AssertAcquired();
139 if (InTwoPhaseWrite())
140 return MOJO_RESULT_BUSY;
141 if (error_)
142 return MOJO_RESULT_FAILED_PRECONDITION;
143
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
145 uint32_t min_num_bytes_to_write = 0;
146 if (all_or_none) {
147 min_num_bytes_to_write = *buffer_num_bytes;
148 if (min_num_bytes_to_write % options_.element_num_bytes != 0)
149 return MOJO_RESULT_INVALID_ARGUMENT;
150 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
151 // Don't return "should wait" since you can't wait for a specified amount
152 // of data.
153 return MOJO_RESULT_OUT_OF_RANGE;
154 }
155 }
156
157 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
158 if (*buffer_num_bytes == 0)
159 *buffer_num_bytes = options_.capacity_num_bytes;
160
161 two_phase_data_.resize(*buffer_num_bytes);
162 *buffer = &two_phase_data_[0];
163
164 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
165 // we can construct a MessageInTransit here. But then we need to make
166 // MessageInTransit support changing its data size later.
167
168 return MOJO_RESULT_OK;
169 }
170
171 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
172 uint32_t num_bytes_written) {
173 lock().AssertAcquired();
174 if (!InTwoPhaseWrite())
175 return MOJO_RESULT_FAILED_PRECONDITION;
176
177 // Note: Allow successful completion of the two-phase write even if the other
178 // side has been closed.
179 MojoResult rv = MOJO_RESULT_OK;
180 if (num_bytes_written > two_phase_data_.size() ||
181 num_bytes_written % options_.element_num_bytes != 0) {
182 rv = MOJO_RESULT_INVALID_ARGUMENT;
183 } else if (channel_) {
184 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
185 }
186
187 // Two-phase write ended even on failure.
188 two_phase_data_.clear();
189 // If we're now writable, we *became* writable (since we weren't writable
190 // during the two-phase write), so awake producer awakables.
191 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
192 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
193 awakable_list_.AwakeForStateChange(new_state);
194
195 return rv;
196 }
197
198 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
199 const {
200 lock().AssertAcquired();
201
202 HandleSignalsState rv;
203 if (!error_) {
204 if (!InTwoPhaseWrite())
205 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
206 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
207 } else {
208 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
209 }
210 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
211 return rv;
212 }
213
214 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
215 Awakable* awakable,
216 MojoHandleSignals signals,
217 uint32_t context,
218 HandleSignalsState* signals_state) {
219 lock().AssertAcquired();
220 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
221 if (state.satisfies(signals)) {
222 if (signals_state)
223 *signals_state = state;
224 return MOJO_RESULT_ALREADY_EXISTS;
225 }
226 if (!state.can_satisfy(signals)) {
227 if (signals_state)
228 *signals_state = state;
229 return MOJO_RESULT_FAILED_PRECONDITION;
230 }
231
232 awakable_list_.Add(awakable, signals, context);
233 return MOJO_RESULT_OK;
234 }
235
236 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
237 Awakable* awakable,
238 HandleSignalsState* signals_state) {
239 lock().AssertAcquired();
240 awakable_list_.Remove(awakable);
241 if (signals_state)
242 *signals_state = GetHandleSignalsStateImplNoLock();
243 }
244
245 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
246 size_t* max_size,
247 size_t* max_platform_handles) {
248 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
249
250 if (channel_) {
251 std::vector<char> temp;
252 serialized_platform_handle_ = channel_->ReleaseHandle(&temp);
253 channel_ = nullptr;
254 DCHECK(temp.empty());
255 }
256 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
257 false, max_size, max_platform_handles);
258 }
259
260 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
261 void* destination,
262 size_t* actual_size,
263 PlatformHandleVector* platform_handles) {
264 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
265
266 DataPipe::EndSerialize(
267 options_,
268 serialized_platform_handle_.Pass(),
269 ScopedPlatformHandle(), 0,
270 destination, actual_size, platform_handles);
271 CloseImplNoLock();
272 return true;
273 }
274
275 void DataPipeProducerDispatcher::TransportStarted() {
276 started_transport_.Acquire();
277 }
278
279 void DataPipeProducerDispatcher::TransportEnded() {
280 started_transport_.Release();
281 }
282
283 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
284 lock().AssertAcquired();
285 return InTwoPhaseWrite();
286 }
287
288 void DataPipeProducerDispatcher::OnReadMessage(
289 const MessageInTransit::View& message_view,
290 ScopedPlatformHandleVectorPtr platform_handles) {
291 NOTREACHED();
292 }
293
294 void DataPipeProducerDispatcher::OnError(Error error) {
295 switch (error) {
296 case ERROR_READ_SHUTDOWN:
297 case ERROR_READ_BROKEN:
298 case ERROR_READ_BAD_MESSAGE:
299 case ERROR_READ_UNKNOWN:
300 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages";
301 break;
302 case ERROR_WRITE:
303 // Write errors are slightly notable: they probably shouldn't happen under
304 // normal operation (but maybe the other side crashed).
305 LOG(WARNING) << "DataPipeProducerDispatcher write error";
306 break;
307 }
308
309 error_ = true;
310 if (started_transport_.Try()) {
311 base::AutoLock locker(lock());
312 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
313
314 base::MessageLoop::current()->PostTask(
315 FROM_HERE,
316 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
317 channel_ = nullptr;
318 started_transport_.Release();
319 } else {
320 // We must be waiting to call ReleaseHandle. It will call Shutdown.
321 }
322 }
323
324 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
325 return !two_phase_data_.empty();
326 }
327
328 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
329 const void* elements,
330 uint32_t num_bytes) {
331 // The maximum amount of data to send per message (make it a multiple of the
332 // element size.
333 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
334 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
335 DCHECK_GT(max_message_num_bytes, 0u);
336
337 uint32_t offset = 0;
338 while (offset < num_bytes) {
339 uint32_t message_num_bytes =
340 std::min(static_cast<uint32_t>(max_message_num_bytes),
341 num_bytes - offset);
342 scoped_ptr<MessageInTransit> message(new MessageInTransit(
343 MessageInTransit::Type::MESSAGE, message_num_bytes,
344 static_cast<const char*>(elements) + offset));
345 if (!channel_->WriteMessage(message.Pass())) {
346 error_ = true;
347 return false;
348 }
349
350 offset += message_num_bytes;
351 }
352
353 return true;
354 }
355
356 } // namespace edk
357 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698