Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more cleanup Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/edk/embedder/embedder_internal.h"
13 #include "mojo/edk/embedder/platform_shared_buffer.h"
14 #include "mojo/edk/embedder/platform_support.h"
15 #include "mojo/edk/system/data_pipe.h"
16
17 namespace mojo {
18 namespace edk {
19
20 struct SharedMemoryHeader {
21 uint32_t data_size;
22 uint32_t read_buffer_size;
23 };
24
25 void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe) {
26 if (message_pipe.is_valid()) {
27 channel_ = RawChannel::Create(message_pipe.Pass());
28 if (!serialized_read_buffer_.empty())
29 channel_->SetInitialReadBufferData(
30 &serialized_read_buffer_[0], serialized_read_buffer_.size());
31 serialized_read_buffer_.clear();
32 internal::g_io_thread_task_runner->PostTask(
33 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
34 }
35 }
36
37 void DataPipeConsumerDispatcher::InitOnIO() {
38 base::AutoLock locker(lock());
39 calling_init_ = true;
40 if (channel_)
41 channel_->Init(this);
42 calling_init_ = false;
43 }
44
45 void DataPipeConsumerDispatcher::CloseOnIO() {
46 base::AutoLock locker(lock());
47 if (channel_) {
48 channel_->Shutdown();
49 channel_ = nullptr;
50 }
51 }
52
53 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
54 return Type::DATA_PIPE_CONSUMER;
55 }
56
57 scoped_refptr<DataPipeConsumerDispatcher>
58 DataPipeConsumerDispatcher::Deserialize(
59 const void* source,
60 size_t size,
61 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options;
63 ScopedPlatformHandle shared_memory_handle;
64 size_t shared_memory_size = 0;
65
66 ScopedPlatformHandle platform_handle =
67 DataPipe::Deserialize(source, size, platform_handles, &options,
68 &shared_memory_handle, &shared_memory_size);
69
70 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
71
72 if (shared_memory_size) {
73 scoped_refptr<PlatformSharedBuffer> shared_buffer(
74 internal::g_platform_support->CreateSharedBufferFromHandle(
75 shared_memory_size, shared_memory_handle.Pass()));;
76 scoped_ptr<PlatformSharedBufferMapping> mapping(
77 shared_buffer->Map(0, shared_memory_size));
78 char* buffer = static_cast<char*>(mapping->GetBase());
79 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
80 buffer += sizeof(SharedMemoryHeader);
81 if (header->data_size) {
82 rv->data_.resize(header->data_size);
83 memcpy(&rv->data_[0], buffer, header->data_size);
84 buffer += header->data_size;
85 }
86 if (header->read_buffer_size) {
87 rv->serialized_read_buffer_.resize(header->read_buffer_size);
88 memcpy(&rv->serialized_read_buffer_[0], buffer, header->read_buffer_size);
89 buffer += header->read_buffer_size;
90 }
91
92 }
93
94 if (platform_handle.is_valid())
95 rv->Init(platform_handle.Pass());
96 return rv;
97 }
98
99 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
100 const MojoCreateDataPipeOptions& options)
101 : options_(options),
102 channel_(nullptr),
103 calling_init_(false),
104 in_two_phase_read_(false),
105 two_phase_max_bytes_read_(0),
106 error_(false),
107 serialized_(false) {
108 }
109
110 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
111 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
112 DCHECK(!channel_);
113 }
114
115 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
116 lock().AssertAcquired();
117 awakable_list_.CancelAll();
118 }
119
120 void DataPipeConsumerDispatcher::CloseImplNoLock() {
121 lock().AssertAcquired();
122 internal::g_io_thread_task_runner->PostTask(
123 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
124 }
125
126 scoped_refptr<Dispatcher>
127 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
128 lock().AssertAcquired();
129
130 SerializeInternal();
131
132 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
133 rv->channel_ = channel_;
134 channel_ = nullptr;
135 rv->options_ = options_;
136 data_.swap(rv->data_);
137 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
138 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
139 rv->serialized_ = true;
140
141 return scoped_refptr<Dispatcher>(rv.get());
142 }
143
144 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
145 void* elements,
146 uint32_t* num_bytes,
147 MojoReadDataFlags flags) {
148 lock().AssertAcquired();
149 if (in_two_phase_read_)
150 return MOJO_RESULT_BUSY;
151
152 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
153 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
154 (flags & MOJO_READ_DATA_FLAG_DISCARD))
155 return MOJO_RESULT_INVALID_ARGUMENT;
156 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
157 DVLOG_IF(2, elements)
158 << "Query mode: ignoring non-null |elements|";
159 *num_bytes = static_cast<uint32_t>(data_.size());
160 return MOJO_RESULT_OK;
161 }
162
163 bool discard = false;
164 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
165 // These flags are mutally exclusive.
166 if (flags & MOJO_READ_DATA_FLAG_PEEK)
167 return MOJO_RESULT_INVALID_ARGUMENT;
168 DVLOG_IF(2, elements)
169 << "Discard mode: ignoring non-null |elements|";
170 discard = true;
171 }
172
173 uint32_t max_num_bytes_to_read = *num_bytes;
174 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
175 return MOJO_RESULT_INVALID_ARGUMENT;
176
177 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
178 uint32_t min_num_bytes_to_read =
179 all_or_none ? max_num_bytes_to_read : 0;
180
181 if (min_num_bytes_to_read > data_.size())
182 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
183
184 uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
185 static_cast<uint32_t>(data_.size()));
186 if (bytes_to_read == 0)
187 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
188
189 if (!discard)
190 memcpy(elements, &data_[0], bytes_to_read);
191 *num_bytes = bytes_to_read;
192
193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
194 if (discard || !peek)
195 data_.erase(data_.begin(), data_.begin() + bytes_to_read);
196
197 return MOJO_RESULT_OK;
198 }
199
200 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
201 const void** buffer,
202 uint32_t* buffer_num_bytes,
203 MojoReadDataFlags flags) {
204 lock().AssertAcquired();
205 if (in_two_phase_read_)
206 return MOJO_RESULT_BUSY;
207
208 // These flags may not be used in two-phase mode.
209 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
210 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
211 (flags & MOJO_READ_DATA_FLAG_PEEK))
212 return MOJO_RESULT_INVALID_ARGUMENT;
213
214 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
215 uint32_t min_num_bytes_to_read = 0;
216 if (all_or_none) {
217 min_num_bytes_to_read = *buffer_num_bytes;
218 if (min_num_bytes_to_read % options_.element_num_bytes != 0)
219 return MOJO_RESULT_INVALID_ARGUMENT;
220 }
221
222 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size());
223 if (min_num_bytes_to_read > max_num_bytes_to_read)
224 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
225 if (max_num_bytes_to_read == 0)
226 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
227
228 in_two_phase_read_ = true;
229 *buffer = &data_[0];
230 *buffer_num_bytes = max_num_bytes_to_read;
231 two_phase_max_bytes_read_ = max_num_bytes_to_read;
232
233 return MOJO_RESULT_OK;
234 }
235
236 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
237 uint32_t num_bytes_read) {
238 lock().AssertAcquired();
239 if (!in_two_phase_read_)
240 return MOJO_RESULT_FAILED_PRECONDITION;
241
242 MojoResult rv;
243 if (num_bytes_read > two_phase_max_bytes_read_ ||
244 num_bytes_read % options_.element_num_bytes != 0) {
245 rv = MOJO_RESULT_INVALID_ARGUMENT;
246 } else {
247 rv = MOJO_RESULT_OK;
248 data_.erase(data_.begin(), data_.begin() + num_bytes_read);
249 }
250
251 in_two_phase_read_ = false;
252 two_phase_max_bytes_read_ = 0;
253
254 // If we're now readable, we *became* readable (since we weren't readable
255 // during the two-phase read), so awake consumer awakables.
256 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
257 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
258 awakable_list_.AwakeForStateChange(new_state);
259
260 return rv;
261 }
262
263 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
264 const {
265 lock().AssertAcquired();
266
267 HandleSignalsState rv;
268 if (!data_.empty()) {
269 if (!in_two_phase_read_)
270 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
271 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
272 } else if (!error_) {
273 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
274 }
275
276 if (error_)
277 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
278 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
279 return rv;
280 }
281
282 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
283 Awakable* awakable,
284 MojoHandleSignals signals,
285 uint32_t context,
286 HandleSignalsState* signals_state) {
287 lock().AssertAcquired();
288 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
289 if (state.satisfies(signals)) {
290 if (signals_state)
291 *signals_state = state;
292 return MOJO_RESULT_ALREADY_EXISTS;
293 }
294 if (!state.can_satisfy(signals)) {
295 if (signals_state)
296 *signals_state = state;
297 return MOJO_RESULT_FAILED_PRECONDITION;
298 }
299
300 awakable_list_.Add(awakable, signals, context);
301 return MOJO_RESULT_OK;
302 }
303
304 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
305 Awakable* awakable,
306 HandleSignalsState* signals_state) {
307 lock().AssertAcquired();
308 awakable_list_.Remove(awakable);
309 if (signals_state)
310 *signals_state = GetHandleSignalsStateImplNoLock();
311 }
312
313 void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
314 size_t* max_size,
315 size_t* max_platform_handles) {
316 if (!serialized_) {
317 // Handles the case where we have messages read off RawChannel but not ready
318 // by MojoReadMessage.
319 SerializeInternal();
320 }
321
322 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
323 !data_.empty(),
324 max_size, max_platform_handles);
325 }
326
327 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
328 void* destination,
329 size_t* actual_size,
330 PlatformHandleVector* platform_handles) {
331 ScopedPlatformHandle shared_memory_handle;
332 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
333 if (shared_memory_size) {
334 shared_memory_size += sizeof(SharedMemoryHeader);
335 SharedMemoryHeader header;
336 header.data_size = static_cast<uint32_t>(data_.size());
337 header.read_buffer_size =
338 static_cast<uint32_t>(serialized_read_buffer_.size());
339
340 scoped_refptr<PlatformSharedBuffer> shared_buffer(
341 internal::g_platform_support->CreateSharedBuffer(
342 shared_memory_size));
343 scoped_ptr<PlatformSharedBufferMapping> mapping(
344 shared_buffer->Map(0, shared_memory_size));
345
346 char* start = static_cast<char*>(mapping->GetBase());
347 memcpy(start, &header, sizeof(SharedMemoryHeader));
348 start += sizeof(SharedMemoryHeader);
349
350 if (!data_.empty()) {
351 memcpy(start, &data_[0], data_.size());
352 start += data_.size();
353 }
354
355 if (!serialized_read_buffer_.empty()) {
356 memcpy(start, &serialized_read_buffer_[0],
357 serialized_read_buffer_.size());
358 start += serialized_read_buffer_.size();
359 }
360
361 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
362 }
363
364 DataPipe::EndSerialize(
365 options_,
366 serialized_platform_handle_.Pass(),
367 shared_memory_handle.Pass(),
368 shared_memory_size, destination, actual_size,
369 platform_handles);
370 CloseImplNoLock();
371 return true;
372 }
373
374 void DataPipeConsumerDispatcher::TransportStarted() {
375 started_transport_.Acquire();
376 }
377
378 void DataPipeConsumerDispatcher::TransportEnded() {
379 started_transport_.Release();
380
381 base::AutoLock locker(lock());
382
383 // If transporting of DP failed, we might have got more data and didn't awake
384 // for.
385 // TODO(jam): should we care about only alerting if it was empty before
386 // TransportStarted?
387 if (!data_.empty())
388 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
389 }
390
391 bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
392 lock().AssertAcquired();
393 return in_two_phase_read_;
394 }
395
396 void DataPipeConsumerDispatcher::OnReadMessage(
397 const MessageInTransit::View& message_view,
398 ScopedPlatformHandleVectorPtr platform_handles) {
399 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
400
401 if (started_transport_.Try()) {
402 // We're not in the middle of being sent.
403
404 // Can get synchronously called back in Init if there was initial data.
405 scoped_ptr<base::AutoLock> locker;
406 if (!calling_init_) {
407 locker.reset(new base::AutoLock(lock()));
408 }
409
410 size_t old_size = data_.size();
411 data_.resize(old_size + message->num_bytes());
412 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
413 if (!old_size)
414 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
415 started_transport_.Release();
416 } else {
417 size_t old_size = data_.size();
418 data_.resize(old_size + message->num_bytes());
419 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
420 }
421 }
422
423 void DataPipeConsumerDispatcher::OnError(Error error) {
424 switch (error) {
425 case ERROR_READ_SHUTDOWN:
426 // The other side was cleanly closed, so this isn't actually an error.
427 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
428 break;
429 case ERROR_READ_BROKEN:
430 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
431 break;
432 case ERROR_READ_BAD_MESSAGE:
433 // Receiving a bad message means either a bug, data corruption, or
434 // malicious attack (probably due to some other bug).
435 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
436 << "message)";
437 break;
438 case ERROR_READ_UNKNOWN:
439 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
440 break;
441 case ERROR_WRITE:
442 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
443 break;
444 }
445
446 error_ = true;
447 if (started_transport_.Try()) {
448 base::AutoLock locker(lock());
449 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
450 started_transport_.Release();
451
452 base::MessageLoop::current()->PostTask(
453 FROM_HERE,
454 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
455 channel_ = nullptr;
456 } else {
457 // We must be waiting to call ReleaseHandle. It will call Shutdown.
458 }
459 }
460
461 void DataPipeConsumerDispatcher::SerializeInternal() {
462 // need to stop watching handle immediately, even tho not on IO thread, so
463 // that other messages aren't read after this.
464 if (channel_) {
465 serialized_platform_handle_ =
466 channel_->ReleaseHandle(&serialized_read_buffer_);
467
468 channel_ = nullptr;
469 serialized_ = true;
470 }
471 }
472
473 } // namespace edk
474 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698