Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(748)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: move to mojo::edk namespace in preparation for runtim flag Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/edk/embedder/embedder_internal.h"
13 #include "mojo/edk/embedder/platform_shared_buffer.h"
14 #include "mojo/edk/embedder/platform_support.h"
15 #include "mojo/edk/system/data_pipe.h"
16 #include "mojo/edk/system/memory.h"
17
18 namespace mojo {
19 namespace edk {
20
21 struct SharedMemoryHeader {
22 uint32_t data_size;
23 uint32_t read_buffer_size;
24 };
25
26 void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe) {
27 if (message_pipe.is_valid()) {
28 channel_ = RawChannel::Create(message_pipe.Pass());
29 if (!serialized_read_buffer_.empty())
30 channel_->SetInitialReadBufferData(
31 &serialized_read_buffer_[0], serialized_read_buffer_.size());
32 serialized_read_buffer_.clear();
33 internal::g_io_thread_task_runner->PostTask(
34 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
35 }
36 }
37
38 void DataPipeConsumerDispatcher::InitOnIO() {
39 base::AutoLock locker(lock());
40 calling_init_ = true;
41 if (channel_)
42 channel_->Init(this);
43 calling_init_ = false;
44 }
45
46 void DataPipeConsumerDispatcher::CloseOnIO() {
47 base::AutoLock locker(lock());
48 if (channel_) {
49 channel_->Shutdown();
50 channel_ = nullptr;
51 }
52 }
53
54 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
55 return Type::DATA_PIPE_CONSUMER;
56 }
57
58 scoped_refptr<DataPipeConsumerDispatcher>
59 DataPipeConsumerDispatcher::Deserialize(
60 const void* source,
61 size_t size,
62 PlatformHandleVector* platform_handles) {
63 MojoCreateDataPipeOptions options;
64 ScopedPlatformHandle shared_memory_handle;
65 size_t shared_memory_size = 0;
66
67 ScopedPlatformHandle platform_handle =
68 DataPipe::Deserialize(source, size, platform_handles, &options,
69 &shared_memory_handle, &shared_memory_size);
70
71 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
72
73 if (shared_memory_size) {
74 scoped_refptr<PlatformSharedBuffer> shared_buffer(
75 internal::g_platform_support->CreateSharedBufferFromHandle(
76 shared_memory_size, shared_memory_handle.Pass()));;
77 scoped_ptr<PlatformSharedBufferMapping> mapping(
78 shared_buffer->Map(0, shared_memory_size));
79 char* buffer = static_cast<char*>(mapping->GetBase());
80 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
81 buffer+= sizeof(SharedMemoryHeader);
82 if (header->data_size) {
83 rv->data_.resize(header->data_size);
84 memcpy(&rv->data_[0], buffer, header->data_size);
85 buffer += header->data_size;
86 }
87 if (header->read_buffer_size) {
88 rv->serialized_read_buffer_.resize(header->read_buffer_size);
89 memcpy(&rv->serialized_read_buffer_[0], buffer, header->read_buffer_size);
90 buffer += header->read_buffer_size;
91 }
92
93 }
94
95 if (platform_handle.is_valid())
96 rv->Init(platform_handle.Pass());
97 return rv;
98 }
99
100 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
101 const MojoCreateDataPipeOptions& options)
102 : options_(options),
103 channel_(nullptr),
104 calling_init_(false),
105 in_two_phase_read_(false),
106 two_phase_max_bytes_read_(0),
107 error_(false),
108 serialized_(false) {
109 }
110
111 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
112 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
113 DCHECK(!channel_);
114 }
115
116 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
117 lock().AssertAcquired();
118 awakable_list_.CancelAll();
119 }
120
121 void DataPipeConsumerDispatcher::CloseImplNoLock() {
122 lock().AssertAcquired();
123 internal::g_io_thread_task_runner->PostTask(
124 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
125 }
126
127 scoped_refptr<Dispatcher>
128 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
129 lock().AssertAcquired();
130
131 SerializeInternal();
132
133 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
134 rv->channel_ = channel_;
135 channel_ = nullptr;
136 rv->options_ = options_;
137 data_.swap(rv->data_);
138 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
139 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
140 rv->serialized_ = true;
141
142 return scoped_refptr<Dispatcher>(rv.get());
143 }
144
145 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
146 UserPointer<void> elements,
147 UserPointer<uint32_t> num_bytes,
148 MojoReadDataFlags flags) {
149 lock().AssertAcquired();
150 if (in_two_phase_read_)
151 return MOJO_RESULT_BUSY;
152
153 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
154 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
155 (flags & MOJO_READ_DATA_FLAG_DISCARD))
156 return MOJO_RESULT_INVALID_ARGUMENT;
157 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
158 DVLOG_IF(2, !elements.IsNull())
159 << "Query mode: ignoring non-null |elements|";
160 num_bytes.Put(data_.size());
161 return MOJO_RESULT_OK;
162 }
163
164 bool discard = false;
165 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
166 // These flags are mutally exclusive.
167 if (flags & MOJO_READ_DATA_FLAG_PEEK)
168 return MOJO_RESULT_INVALID_ARGUMENT;
169 DVLOG_IF(2, !elements.IsNull())
170 << "Discard mode: ignoring non-null |elements|";
171 discard = true;
172 }
173
174 uint32_t max_num_bytes_to_read = num_bytes.Get();
175 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
176 return MOJO_RESULT_INVALID_ARGUMENT;
177
178 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
179 uint32_t min_num_bytes_to_read =
180 all_or_none ? max_num_bytes_to_read : 0;
181
182 if (min_num_bytes_to_read > data_.size())
183 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
184
185 uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
186 static_cast<uint32_t>(data_.size()));
187 if (bytes_to_read == 0)
188 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
189
190 if (!discard)
191 elements.PutArray(&data_[0], bytes_to_read);
192 num_bytes.Put(bytes_to_read);
193
194 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
195 if (discard || !peek)
196 data_.erase(data_.begin(), data_.begin() + bytes_to_read);
197
198 return MOJO_RESULT_OK;
199 }
200
201 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
202 UserPointer<const void*> buffer,
203 UserPointer<uint32_t> buffer_num_bytes,
204 MojoReadDataFlags flags) {
205 lock().AssertAcquired();
206 if (in_two_phase_read_)
207 return MOJO_RESULT_BUSY;
208
209 // These flags may not be used in two-phase mode.
210 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
211 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
212 (flags & MOJO_READ_DATA_FLAG_PEEK))
213 return MOJO_RESULT_INVALID_ARGUMENT;
214
215 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
216 uint32_t min_num_bytes_to_read = 0;
217 if (all_or_none) {
218 min_num_bytes_to_read = buffer_num_bytes.Get();
219 if (min_num_bytes_to_read % options_.element_num_bytes != 0)
220 return MOJO_RESULT_INVALID_ARGUMENT;
221 }
222
223 uint32_t max_num_bytes_to_read = data_.size();
224 if (min_num_bytes_to_read > max_num_bytes_to_read)
225 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
226 if (max_num_bytes_to_read == 0)
227 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
228
229 in_two_phase_read_ = true;
230 buffer.Put(&data_[0]);
231 buffer_num_bytes.Put(max_num_bytes_to_read);
232 two_phase_max_bytes_read_ = max_num_bytes_to_read;
233
234 return MOJO_RESULT_OK;
235 }
236
237 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
238 uint32_t num_bytes_read) {
239 lock().AssertAcquired();
240 if (!in_two_phase_read_)
241 return MOJO_RESULT_FAILED_PRECONDITION;
242
243 MojoResult rv;
244 if (num_bytes_read > two_phase_max_bytes_read_ ||
245 num_bytes_read % options_.element_num_bytes != 0) {
246 rv = MOJO_RESULT_INVALID_ARGUMENT;
247 } else {
248 rv = MOJO_RESULT_OK;
249 data_.erase(data_.begin(), data_.begin() + num_bytes_read);
250 }
251
252 in_two_phase_read_ = false;
253 two_phase_max_bytes_read_ = 0;
254
255 // If we're now readable, we *became* readable (since we weren't readable
256 // during the two-phase read), so awake consumer awakables.
257 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
258 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
259 awakable_list_.AwakeForStateChange(new_state);
260
261 return rv;
262 }
263
264 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
265 const {
266 lock().AssertAcquired();
267
268 HandleSignalsState rv;
269 if (!data_.empty()) {
270 if (!in_two_phase_read_)
271 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
272 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
273 } else if (!error_) {
274 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
275 }
276
277 if (error_)
278 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
279 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
280 return rv;
281 }
282
283 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
284 Awakable* awakable,
285 MojoHandleSignals signals,
286 uint32_t context,
287 HandleSignalsState* signals_state) {
288 lock().AssertAcquired();
289 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
290 if (state.satisfies(signals)) {
291 if (signals_state)
292 *signals_state = state;
293 return MOJO_RESULT_ALREADY_EXISTS;
294 }
295 if (!state.can_satisfy(signals)) {
296 if (signals_state)
297 *signals_state = state;
298 return MOJO_RESULT_FAILED_PRECONDITION;
299 }
300
301 awakable_list_.Add(awakable, signals, context);
302 return MOJO_RESULT_OK;
303 }
304
305 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
306 Awakable* awakable,
307 HandleSignalsState* signals_state) {
308 lock().AssertAcquired();
309 awakable_list_.Remove(awakable);
310 if (signals_state)
311 *signals_state = GetHandleSignalsStateImplNoLock();
312 }
313
314 void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
315 size_t* max_size,
316 size_t* max_platform_handles) {
317 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
318
319 if (!serialized_) {
320 // handles the case where we have messages read off rawchannel but not
321 // ready by MojoReadMessage.
322 SerializeInternal();
323 }
324
325 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
326 !data_.empty(),
327 max_size, max_platform_handles);
328 }
329
330 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
331 void* destination,
332 size_t* actual_size,
333 PlatformHandleVector* platform_handles) {
334 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
335
336 ScopedPlatformHandle shared_memory_handle;
337 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
338 if (shared_memory_size) {
339 shared_memory_size += sizeof(SharedMemoryHeader);
340 SharedMemoryHeader header;
341 header.data_size = data_.size();
342 header.read_buffer_size = serialized_read_buffer_.size();
343
344 scoped_refptr<PlatformSharedBuffer> shared_buffer(
345 internal::g_platform_support->CreateSharedBuffer(
346 shared_memory_size));
347 scoped_ptr<PlatformSharedBufferMapping> mapping(
348 shared_buffer->Map(0, shared_memory_size));
349
350 char* start = static_cast<char*>(mapping->GetBase());
351 memcpy(start, &header, sizeof(SharedMemoryHeader));
352 start += sizeof(SharedMemoryHeader);
353
354
355 if (!data_.empty()) {
356 memcpy(start, &data_[0], data_.size());
357 start += data_.size();
358 }
359
360 if (!serialized_read_buffer_.empty()) {
361 memcpy(start, &serialized_read_buffer_[0],
362 serialized_read_buffer_.size());
363 start += serialized_read_buffer_.size();
364 }
365
366 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
367 }
368
369 DataPipe::EndSerialize(
370 options_,
371 serialized_platform_handle_.Pass(),
372 shared_memory_handle.Pass(),
373 shared_memory_size, destination, actual_size,
374 platform_handles);
375 CloseImplNoLock();
376 return true;
377 }
378
379 void DataPipeConsumerDispatcher::TransportStarted() {
380 started_transport_.Acquire();
381 }
382
383 void DataPipeConsumerDispatcher::TransportEnded() {
384 started_transport_.Release();
385
386 base::AutoLock locker(lock());
387
388 // If transporting of DP failed, we might have got more data and didn't awake
389 // for.
390 // TODO(jam): should we care about only alerting if it was empty before
391 // TransportStarted?
392 if (!data_.empty())
393 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
394 }
395
396 bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
397 lock().AssertAcquired();
398 return in_two_phase_read_;
399 }
400
401 void DataPipeConsumerDispatcher::OnReadMessage(
402 const MessageInTransit::View& message_view,
403 ScopedPlatformHandleVectorPtr platform_handles) {
404 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
405
406 if (started_transport_.Try()) {
407 // we're not in the middle of being sent
408
409 // Can get synchronously called back in Init if there was initial data.
410 scoped_ptr<base::AutoLock> locker;
411 if (!calling_init_) {
412 locker.reset(new base::AutoLock(lock()));
413 }
414
415 size_t old_size = data_.size();
416 data_.resize(old_size + message->num_bytes());
417 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
418 if (!old_size)
419 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
420 started_transport_.Release();
421 } else {
422 size_t old_size = data_.size();
423 data_.resize(old_size + message->num_bytes());
424 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
425 }
426 }
427
428 void DataPipeConsumerDispatcher::OnError(Error error) {
429 switch (error) {
430 case ERROR_READ_SHUTDOWN:
431 // The other side was cleanly closed, so this isn't actually an error.
432 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
433 break;
434 case ERROR_READ_BROKEN:
435 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
436 break;
437 case ERROR_READ_BAD_MESSAGE:
438 // Receiving a bad message means either a bug, data corruption, or
439 // malicious attack (probably due to some other bug).
440 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
441 << "message)";
442 break;
443 case ERROR_READ_UNKNOWN:
444 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
445 break;
446 case ERROR_WRITE:
447 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
448 break;
449 }
450
451 error_ = true;
452 if (started_transport_.Try()) {
453 base::AutoLock locker(lock());
454 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
455 started_transport_.Release();
456
457 base::MessageLoop::current()->PostTask(
458 FROM_HERE,
459 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
460 channel_ = nullptr;
461 } else {
462 // We must be waiting to call ReleaseHandle. It will call Shutdown.
463 }
464 }
465
466 void DataPipeConsumerDispatcher::SerializeInternal() {
467 // need to stop watching handle immediately, even tho not on IO thread, so
468 // that other messages aren't read after this.
469 if (channel_) {
470 serialized_platform_handle_ =
471 channel_->ReleaseHandle(&serialized_read_buffer_);
472
473 channel_ = nullptr;
474 serialized_ = true;
475 }
476 }
477
478 } // namespace edk
479 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698