Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(694)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: convert remaining MP tests and simplify RawChannel destruction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/edk/embedder/embedder_internal.h"
13 #include "mojo/edk/embedder/platform_shared_buffer.h"
14 #include "mojo/edk/embedder/platform_support.h"
15 #include "mojo/edk/system/data_pipe.h"
16 #include "mojo/edk/system/memory.h"
17
18 namespace mojo {
19 namespace system {
20
21 struct SharedMemoryHeader {
22 uint32_t data_size;
23 uint32_t read_buffer_size;
24 };
25
26 void DataPipeConsumerDispatcher::Init(
27 embedder::ScopedPlatformHandle message_pipe) {
28 if (message_pipe.is_valid()) {
29 channel_ = RawChannel::Create(message_pipe.Pass());
30 if (!serialized_read_buffer_.empty())
31 channel_->SetInitialReadBufferData(
32 &serialized_read_buffer_[0], serialized_read_buffer_.size());
33 serialized_read_buffer_.clear();
34 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
35 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
36 }
37 }
38
39 void DataPipeConsumerDispatcher::InitOnIO() {
40 base::AutoLock locker(lock());
41 calling_init_ = true;
42 if (channel_)
43 channel_->Init(this);
44 calling_init_ = false;
45 }
46
47 void DataPipeConsumerDispatcher::CloseOnIO() {
48 base::AutoLock locker(lock());
49 if (channel_) {
50 channel_->Shutdown();
51 channel_ = nullptr;
52 }
53 }
54
55 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
56 return Type::DATA_PIPE_CONSUMER;
57 }
58
59 scoped_refptr<DataPipeConsumerDispatcher>
60 DataPipeConsumerDispatcher::Deserialize(
61 const void* source,
62 size_t size,
63 embedder::PlatformHandleVector* platform_handles) {
64 MojoCreateDataPipeOptions options;
65 embedder::ScopedPlatformHandle shared_memory_handle;
66 size_t shared_memory_size = 0;
67
68 embedder::ScopedPlatformHandle platform_handle =
69 DataPipe::Deserialize(source, size, platform_handles, &options,
70 &shared_memory_handle, &shared_memory_size);
71
72 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
73
74 if (shared_memory_size) {
75 scoped_refptr<embedder::PlatformSharedBuffer> shared_buffer(
76 embedder::internal::g_platform_support->CreateSharedBufferFromHandle(
77 shared_memory_size, shared_memory_handle.Pass()));;
78 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping(
79 shared_buffer->Map(0, shared_memory_size));
80 char* buffer = static_cast<char*>(mapping->GetBase());
81 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
82 buffer+= sizeof(SharedMemoryHeader);
83 if (header->data_size) {
84 rv->data_.resize(header->data_size);
85 memcpy(&rv->data_[0], buffer, header->data_size);
86 buffer += header->data_size;
87 }
88 if (header->read_buffer_size) {
89 rv->serialized_read_buffer_.resize(header->read_buffer_size);
90 memcpy(&rv->serialized_read_buffer_[0], buffer, header->read_buffer_size);
91 buffer += header->read_buffer_size;
92 }
93
94 }
95
96 if (platform_handle.is_valid())
97 rv->Init(platform_handle.Pass());
98 return rv;
99 }
100
101 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
102 const MojoCreateDataPipeOptions& options)
103 : options_(options),
104 channel_(nullptr),
105 calling_init_(false),
106 in_two_phase_read_(false),
107 two_phase_max_bytes_read_(0),
108 error_(false),
109 serialized_(false) {
110 }
111
112 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
113 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
114 DCHECK(!channel_);
115 }
116
117 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
118 lock().AssertAcquired();
119 awakable_list_.CancelAll();
120 }
121
122 void DataPipeConsumerDispatcher::CloseImplNoLock() {
123 lock().AssertAcquired();
124 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
125 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
126 }
127
128 scoped_refptr<Dispatcher>
129 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
130 lock().AssertAcquired();
131
132 SerializeInternal();
133
134 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
135 rv->channel_ = channel_;
136 channel_ = nullptr;
137 rv->options_ = options_;
138 data_.swap(rv->data_);
139 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
140 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
141 rv->serialized_ = true;
142
143 return scoped_refptr<Dispatcher>(rv.get());
144 }
145
146 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
147 UserPointer<void> elements,
148 UserPointer<uint32_t> num_bytes,
149 MojoReadDataFlags flags) {
150 lock().AssertAcquired();
151 if (in_two_phase_read_)
152 return MOJO_RESULT_BUSY;
153
154 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
155 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
156 (flags & MOJO_READ_DATA_FLAG_DISCARD))
157 return MOJO_RESULT_INVALID_ARGUMENT;
158 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
159 DVLOG_IF(2, !elements.IsNull())
160 << "Query mode: ignoring non-null |elements|";
161 num_bytes.Put(data_.size());
162 return MOJO_RESULT_OK;
163 }
164
165 bool discard = false;
166 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
167 // These flags are mutally exclusive.
168 if (flags & MOJO_READ_DATA_FLAG_PEEK)
169 return MOJO_RESULT_INVALID_ARGUMENT;
170 DVLOG_IF(2, !elements.IsNull())
171 << "Discard mode: ignoring non-null |elements|";
172 discard = true;
173 }
174
175 uint32_t max_num_bytes_to_read = num_bytes.Get();
176 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
177 return MOJO_RESULT_INVALID_ARGUMENT;
178
179 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
180 uint32_t min_num_bytes_to_read =
181 all_or_none ? max_num_bytes_to_read : 0;
182
183 if (min_num_bytes_to_read > data_.size())
184 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
185
186 uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
187 static_cast<uint32_t>(data_.size()));
188 if (bytes_to_read == 0)
189 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
190
191 if (!discard)
192 elements.PutArray(&data_[0], bytes_to_read);
193 num_bytes.Put(bytes_to_read);
194
195 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
196 if (discard || !peek)
197 data_.erase(data_.begin(), data_.begin() + bytes_to_read);
198
199 return MOJO_RESULT_OK;
200 }
201
202 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
203 UserPointer<const void*> buffer,
204 UserPointer<uint32_t> buffer_num_bytes,
205 MojoReadDataFlags flags) {
206 lock().AssertAcquired();
207 if (in_two_phase_read_)
208 return MOJO_RESULT_BUSY;
209
210 // These flags may not be used in two-phase mode.
211 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
212 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
213 (flags & MOJO_READ_DATA_FLAG_PEEK))
214 return MOJO_RESULT_INVALID_ARGUMENT;
215
216 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
217 uint32_t min_num_bytes_to_read = 0;
218 if (all_or_none) {
219 min_num_bytes_to_read = buffer_num_bytes.Get();
220 if (min_num_bytes_to_read % options_.element_num_bytes != 0)
221 return MOJO_RESULT_INVALID_ARGUMENT;
222 }
223
224 uint32_t max_num_bytes_to_read = data_.size();
225 if (min_num_bytes_to_read > max_num_bytes_to_read)
226 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
227 if (max_num_bytes_to_read == 0)
228 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
229
230 in_two_phase_read_ = true;
231 buffer.Put(&data_[0]);
232 buffer_num_bytes.Put(max_num_bytes_to_read);
233 two_phase_max_bytes_read_ = max_num_bytes_to_read;
234
235 return MOJO_RESULT_OK;
236 }
237
238 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
239 uint32_t num_bytes_read) {
240 lock().AssertAcquired();
241 if (!in_two_phase_read_)
242 return MOJO_RESULT_FAILED_PRECONDITION;
243
244 MojoResult rv;
245 if (num_bytes_read > two_phase_max_bytes_read_ ||
246 num_bytes_read % options_.element_num_bytes != 0) {
247 rv = MOJO_RESULT_INVALID_ARGUMENT;
248 } else {
249 rv = MOJO_RESULT_OK;
250 data_.erase(data_.begin(), data_.begin() + num_bytes_read);
251 }
252
253 in_two_phase_read_ = false;
254 two_phase_max_bytes_read_ = 0;
255
256 // If we're now readable, we *became* readable (since we weren't readable
257 // during the two-phase read), so awake consumer awakables.
258 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
259 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
260 awakable_list_.AwakeForStateChange(new_state);
261
262 return rv;
263 }
264
265 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
266 const {
267 lock().AssertAcquired();
268
269 HandleSignalsState rv;
270 if (!data_.empty()) {
271 if (!in_two_phase_read_)
272 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
273 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
274 } else if (!error_) {
275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
276 }
277
278 if (error_)
279 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
280 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
281 return rv;
282 }
283
284 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
285 Awakable* awakable,
286 MojoHandleSignals signals,
287 uint32_t context,
288 HandleSignalsState* signals_state) {
289 lock().AssertAcquired();
290 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
291 if (state.satisfies(signals)) {
292 if (signals_state)
293 *signals_state = state;
294 return MOJO_RESULT_ALREADY_EXISTS;
295 }
296 if (!state.can_satisfy(signals)) {
297 if (signals_state)
298 *signals_state = state;
299 return MOJO_RESULT_FAILED_PRECONDITION;
300 }
301
302 awakable_list_.Add(awakable, signals, context);
303 return MOJO_RESULT_OK;
304 }
305
306 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
307 Awakable* awakable,
308 HandleSignalsState* signals_state) {
309 lock().AssertAcquired();
310 awakable_list_.Remove(awakable);
311 if (signals_state)
312 *signals_state = GetHandleSignalsStateImplNoLock();
313 }
314
315 void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
316 size_t* max_size,
317 size_t* max_platform_handles) {
318 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
319
320 if (!serialized_) {
321 // handles the case where we have messages read off rawchannel but not
322 // ready by MojoReadMessage.
323 SerializeInternal();
324 }
325
326 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
327 !data_.empty(),
328 max_size, max_platform_handles);
329 }
330
331 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
332 void* destination,
333 size_t* actual_size,
334 embedder::PlatformHandleVector* platform_handles) {
335 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
336
337 embedder::ScopedPlatformHandle shared_memory_handle;
338 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
339 if (shared_memory_size) {
340 shared_memory_size += sizeof(SharedMemoryHeader);
341 SharedMemoryHeader header;
342 header.data_size = data_.size();
343 header.read_buffer_size = serialized_read_buffer_.size();
344
345 scoped_refptr<embedder::PlatformSharedBuffer> shared_buffer(
346 embedder::internal::g_platform_support->CreateSharedBuffer(
347 shared_memory_size));
348 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping(
349 shared_buffer->Map(0, shared_memory_size));
350
351 char* start = static_cast<char*>(mapping->GetBase());
352 memcpy(start, &header, sizeof(SharedMemoryHeader));
353 start += sizeof(SharedMemoryHeader);
354
355
356 if (!data_.empty()) {
357 memcpy(start, &data_[0], data_.size());
358 start += data_.size();
359 }
360
361 if (!serialized_read_buffer_.empty()) {
362 memcpy(start, &serialized_read_buffer_[0],
363 serialized_read_buffer_.size());
364 start += serialized_read_buffer_.size();
365 }
366
367 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
368 }
369
370 DataPipe::EndSerialize(
371 options_,
372 serialized_platform_handle_.Pass(),
373 shared_memory_handle.Pass(),
374 shared_memory_size, destination, actual_size,
375 platform_handles);
376 CloseImplNoLock();
377 return true;
378 }
379
380 void DataPipeConsumerDispatcher::TransportStarted() {
381 started_transport_.Acquire();
382 }
383
384 void DataPipeConsumerDispatcher::TransportEnded() {
385 started_transport_.Release();
386
387 base::AutoLock locker(lock());
388
389 // If transporting of DP failed, we might have got more data and didn't awake
390 // for.
391 // TODO(jam): should we care about only alerting if it was empty before
392 // TransportStarted?
393 if (!data_.empty())
394 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
395 }
396
397 bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
398 lock().AssertAcquired();
399 return in_two_phase_read_;
400 }
401
402 void DataPipeConsumerDispatcher::OnReadMessage(
403 const MessageInTransit::View& message_view,
404 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
405 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
406
407 if (started_transport_.Try()) {
408 // we're not in the middle of being sent
409
410 // Can get synchronously called back in Init if there was initial data.
411 scoped_ptr<base::AutoLock> locker;
412 if (!calling_init_) {
413 locker.reset(new base::AutoLock(lock()));
414 }
415
416 size_t old_size = data_.size();
417 data_.resize(old_size + message->num_bytes());
418 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
419 if (!old_size)
420 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
421 started_transport_.Release();
422 } else {
423 size_t old_size = data_.size();
424 data_.resize(old_size + message->num_bytes());
425 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
426 }
427 }
428
429 void DataPipeConsumerDispatcher::OnError(Error error) {
430 switch (error) {
431 case ERROR_READ_SHUTDOWN:
432 // The other side was cleanly closed, so this isn't actually an error.
433 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
434 break;
435 case ERROR_READ_BROKEN:
436 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
437 break;
438 case ERROR_READ_BAD_MESSAGE:
439 // Receiving a bad message means either a bug, data corruption, or
440 // malicious attack (probably due to some other bug).
441 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
442 << "message)";
443 break;
444 case ERROR_READ_UNKNOWN:
445 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
446 break;
447 case ERROR_WRITE:
448 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
449 break;
450 }
451
452 error_ = true;
453 if (started_transport_.Try()) {
454 base::AutoLock locker(lock());
455 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
456 started_transport_.Release();
457
458 base::MessageLoop::current()->PostTask(
459 FROM_HERE,
460 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
461 channel_ = nullptr;
462 } else {
463 // We must be waiting to call ReleaseHandle. It will call Shutdown.
464 }
465 }
466
467 void DataPipeConsumerDispatcher::SerializeInternal() {
468 // need to stop watching handle immediately, even tho not on IO thread, so
469 // that other messages aren't read after this.
470 if (channel_) {
471 serialized_platform_handle_ =
472 channel_->ReleaseHandle(&serialized_read_buffer_);
473
474 channel_ = nullptr;
475 serialized_ = true;
476 }
477 }
478
479 } // namespace system
480 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698