Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(372)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: some review comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "mojo/edk/embedder/embedder_internal.h"
13 #include "mojo/edk/embedder/platform_shared_buffer.h"
14 #include "mojo/edk/embedder/platform_support.h"
15 #include "mojo/edk/system/data_pipe.h"
16
17 namespace mojo {
18 namespace edk {
19
20 struct SharedMemoryHeader {
21 uint32_t data_size;
22 uint32_t read_buffer_size;
23 };
24
25 void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe) {
26 if (message_pipe.is_valid()) {
27 channel_ = RawChannel::Create(message_pipe.Pass());
28 if (!serialized_read_buffer_.empty())
29 channel_->SetInitialReadBufferData(
30 &serialized_read_buffer_[0], serialized_read_buffer_.size());
31 serialized_read_buffer_.clear();
32 internal::g_io_thread_task_runner->PostTask(
33 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
34 }
35 }
36
37 void DataPipeConsumerDispatcher::InitOnIO() {
38 base::AutoLock locker(lock());
39 calling_init_ = true;
40 if (channel_)
41 channel_->Init(this);
42 calling_init_ = false;
43 }
44
45 void DataPipeConsumerDispatcher::CloseOnIO() {
46 base::AutoLock locker(lock());
47 if (channel_) {
48 channel_->Shutdown();
49 channel_ = nullptr;
50 }
51 }
52
53 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
54 return Type::DATA_PIPE_CONSUMER;
55 }
56
57 scoped_refptr<DataPipeConsumerDispatcher>
58 DataPipeConsumerDispatcher::Deserialize(
59 const void* source,
60 size_t size,
61 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options;
63 ScopedPlatformHandle shared_memory_handle;
64 size_t shared_memory_size = 0;
65
66 ScopedPlatformHandle platform_handle =
67 DataPipe::Deserialize(source, size, platform_handles, &options,
68 &shared_memory_handle, &shared_memory_size);
69
70 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
71
72 if (shared_memory_size) {
73 scoped_refptr<PlatformSharedBuffer> shared_buffer(
74 internal::g_platform_support->CreateSharedBufferFromHandle(
75 shared_memory_size, shared_memory_handle.Pass()));;
76 scoped_ptr<PlatformSharedBufferMapping> mapping(
77 shared_buffer->Map(0, shared_memory_size));
78 char* buffer = static_cast<char*>(mapping->GetBase());
79 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
80 buffer+= sizeof(SharedMemoryHeader);
81 if (header->data_size) {
82 rv->data_.resize(header->data_size);
83 memcpy(&rv->data_[0], buffer, header->data_size);
84 buffer += header->data_size;
85 }
86 if (header->read_buffer_size) {
87 rv->serialized_read_buffer_.resize(header->read_buffer_size);
88 memcpy(&rv->serialized_read_buffer_[0], buffer, header->read_buffer_size);
89 buffer += header->read_buffer_size;
90 }
91
92 }
93
94 if (platform_handle.is_valid())
95 rv->Init(platform_handle.Pass());
96 return rv;
97 }
98
99 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
100 const MojoCreateDataPipeOptions& options)
101 : options_(options),
102 channel_(nullptr),
103 calling_init_(false),
104 in_two_phase_read_(false),
105 two_phase_max_bytes_read_(0),
106 error_(false),
107 serialized_(false) {
108 }
109
110 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
111 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
112 DCHECK(!channel_);
113 }
114
115 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
116 lock().AssertAcquired();
117 awakable_list_.CancelAll();
118 }
119
120 void DataPipeConsumerDispatcher::CloseImplNoLock() {
121 lock().AssertAcquired();
122 internal::g_io_thread_task_runner->PostTask(
123 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
124 }
125
126 scoped_refptr<Dispatcher>
127 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
128 lock().AssertAcquired();
129
130 SerializeInternal();
131
132 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
133 rv->channel_ = channel_;
134 channel_ = nullptr;
135 rv->options_ = options_;
136 data_.swap(rv->data_);
137 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
138 rv->serialized_platform_handle_ = serialized_platform_handle_.Pass();
139 rv->serialized_ = true;
140
141 return scoped_refptr<Dispatcher>(rv.get());
142 }
143
144 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
145 void* elements,
146 uint32_t* num_bytes,
147 MojoReadDataFlags flags) {
148 lock().AssertAcquired();
149 if (in_two_phase_read_)
150 return MOJO_RESULT_BUSY;
151
152 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
153 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
154 (flags & MOJO_READ_DATA_FLAG_DISCARD))
155 return MOJO_RESULT_INVALID_ARGUMENT;
156 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
157 DVLOG_IF(2, elements)
158 << "Query mode: ignoring non-null |elements|";
159 *num_bytes = static_cast<uint32_t>(data_.size());
160 return MOJO_RESULT_OK;
161 }
162
163 bool discard = false;
164 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
165 // These flags are mutally exclusive.
166 if (flags & MOJO_READ_DATA_FLAG_PEEK)
167 return MOJO_RESULT_INVALID_ARGUMENT;
168 DVLOG_IF(2, elements)
169 << "Discard mode: ignoring non-null |elements|";
170 discard = true;
171 }
172
173 uint32_t max_num_bytes_to_read = *num_bytes;
174 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
175 return MOJO_RESULT_INVALID_ARGUMENT;
176
177 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
178 uint32_t min_num_bytes_to_read =
179 all_or_none ? max_num_bytes_to_read : 0;
180
181 if (min_num_bytes_to_read > data_.size())
182 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
183
184 uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
185 static_cast<uint32_t>(data_.size()));
186 if (bytes_to_read == 0)
187 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
188
189 if (!discard)
190 memcpy(elements, &data_[0], bytes_to_read);
191 *num_bytes = bytes_to_read;
192
193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
194 if (discard || !peek)
195 data_.erase(data_.begin(), data_.begin() + bytes_to_read);
196
197 return MOJO_RESULT_OK;
198 }
199
200 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
201 const void** buffer,
202 uint32_t* buffer_num_bytes,
203 MojoReadDataFlags flags) {
204 lock().AssertAcquired();
205 if (in_two_phase_read_)
206 return MOJO_RESULT_BUSY;
207
208 // These flags may not be used in two-phase mode.
209 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
210 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
211 (flags & MOJO_READ_DATA_FLAG_PEEK))
212 return MOJO_RESULT_INVALID_ARGUMENT;
213
214 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
215 uint32_t min_num_bytes_to_read = 0;
216 if (all_or_none) {
217 min_num_bytes_to_read = *buffer_num_bytes;
218 if (min_num_bytes_to_read % options_.element_num_bytes != 0)
219 return MOJO_RESULT_INVALID_ARGUMENT;
220 }
221
222 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size());
223 if (min_num_bytes_to_read > max_num_bytes_to_read)
224 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
225 if (max_num_bytes_to_read == 0)
226 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
227
228 in_two_phase_read_ = true;
229 *buffer = &data_[0];
230 *buffer_num_bytes = max_num_bytes_to_read;
231 two_phase_max_bytes_read_ = max_num_bytes_to_read;
232
233 return MOJO_RESULT_OK;
234 }
235
236 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
237 uint32_t num_bytes_read) {
238 lock().AssertAcquired();
239 if (!in_two_phase_read_)
240 return MOJO_RESULT_FAILED_PRECONDITION;
241
242 MojoResult rv;
243 if (num_bytes_read > two_phase_max_bytes_read_ ||
244 num_bytes_read % options_.element_num_bytes != 0) {
245 rv = MOJO_RESULT_INVALID_ARGUMENT;
246 } else {
247 rv = MOJO_RESULT_OK;
248 data_.erase(data_.begin(), data_.begin() + num_bytes_read);
249 }
250
251 in_two_phase_read_ = false;
252 two_phase_max_bytes_read_ = 0;
253
254 // If we're now readable, we *became* readable (since we weren't readable
255 // during the two-phase read), so awake consumer awakables.
256 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
257 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
258 awakable_list_.AwakeForStateChange(new_state);
259
260 return rv;
261 }
262
263 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
264 const {
265 lock().AssertAcquired();
266
267 HandleSignalsState rv;
268 if (!data_.empty()) {
269 if (!in_two_phase_read_)
270 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
271 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
272 } else if (!error_) {
273 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
274 }
275
276 if (error_)
277 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
278 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
279 return rv;
280 }
281
282 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
283 Awakable* awakable,
284 MojoHandleSignals signals,
285 uint32_t context,
286 HandleSignalsState* signals_state) {
287 lock().AssertAcquired();
288 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
289 if (state.satisfies(signals)) {
290 if (signals_state)
291 *signals_state = state;
292 return MOJO_RESULT_ALREADY_EXISTS;
293 }
294 if (!state.can_satisfy(signals)) {
295 if (signals_state)
296 *signals_state = state;
297 return MOJO_RESULT_FAILED_PRECONDITION;
298 }
299
300 awakable_list_.Add(awakable, signals, context);
301 return MOJO_RESULT_OK;
302 }
303
304 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
305 Awakable* awakable,
306 HandleSignalsState* signals_state) {
307 lock().AssertAcquired();
308 awakable_list_.Remove(awakable);
309 if (signals_state)
310 *signals_state = GetHandleSignalsStateImplNoLock();
311 }
312
313 void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
314 size_t* max_size,
315 size_t* max_platform_handles) {
316 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
317
318 if (!serialized_) {
319 // handles the case where we have messages read off rawchannel but not
320 // ready by MojoReadMessage.
321 SerializeInternal();
322 }
323
324 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
325 !data_.empty(),
326 max_size, max_platform_handles);
327 }
328
329 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
330 void* destination,
331 size_t* actual_size,
332 PlatformHandleVector* platform_handles) {
333 //DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
334
335 ScopedPlatformHandle shared_memory_handle;
336 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
337 if (shared_memory_size) {
338 shared_memory_size += sizeof(SharedMemoryHeader);
339 SharedMemoryHeader header;
340 header.data_size = static_cast<uint32_t>(data_.size());
341 header.read_buffer_size =
342 static_cast<uint32_t>(serialized_read_buffer_.size());
343
344 scoped_refptr<PlatformSharedBuffer> shared_buffer(
345 internal::g_platform_support->CreateSharedBuffer(
346 shared_memory_size));
347 scoped_ptr<PlatformSharedBufferMapping> mapping(
348 shared_buffer->Map(0, shared_memory_size));
349
350 char* start = static_cast<char*>(mapping->GetBase());
351 memcpy(start, &header, sizeof(SharedMemoryHeader));
352 start += sizeof(SharedMemoryHeader);
353
354 if (!data_.empty()) {
355 memcpy(start, &data_[0], data_.size());
356 start += data_.size();
357 }
358
359 if (!serialized_read_buffer_.empty()) {
360 memcpy(start, &serialized_read_buffer_[0],
361 serialized_read_buffer_.size());
362 start += serialized_read_buffer_.size();
363 }
364
365 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
366 }
367
368 DataPipe::EndSerialize(
369 options_,
370 serialized_platform_handle_.Pass(),
371 shared_memory_handle.Pass(),
372 shared_memory_size, destination, actual_size,
373 platform_handles);
374 CloseImplNoLock();
375 return true;
376 }
377
378 void DataPipeConsumerDispatcher::TransportStarted() {
379 started_transport_.Acquire();
380 }
381
382 void DataPipeConsumerDispatcher::TransportEnded() {
383 started_transport_.Release();
384
385 base::AutoLock locker(lock());
386
387 // If transporting of DP failed, we might have got more data and didn't awake
388 // for.
389 // TODO(jam): should we care about only alerting if it was empty before
390 // TransportStarted?
391 if (!data_.empty())
392 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
393 }
394
395 bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
396 lock().AssertAcquired();
397 return in_two_phase_read_;
398 }
399
400 void DataPipeConsumerDispatcher::OnReadMessage(
401 const MessageInTransit::View& message_view,
402 ScopedPlatformHandleVectorPtr platform_handles) {
403 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
404
405 if (started_transport_.Try()) {
406 // we're not in the middle of being sent
407
408 // Can get synchronously called back in Init if there was initial data.
409 scoped_ptr<base::AutoLock> locker;
410 if (!calling_init_) {
411 locker.reset(new base::AutoLock(lock()));
412 }
413
414 size_t old_size = data_.size();
415 data_.resize(old_size + message->num_bytes());
416 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
417 if (!old_size)
418 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
419 started_transport_.Release();
420 } else {
421 size_t old_size = data_.size();
422 data_.resize(old_size + message->num_bytes());
423 memcpy(&data_[old_size], message->bytes(), message->num_bytes());
424 }
425 }
426
427 void DataPipeConsumerDispatcher::OnError(Error error) {
428 switch (error) {
429 case ERROR_READ_SHUTDOWN:
430 // The other side was cleanly closed, so this isn't actually an error.
431 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
432 break;
433 case ERROR_READ_BROKEN:
434 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
435 break;
436 case ERROR_READ_BAD_MESSAGE:
437 // Receiving a bad message means either a bug, data corruption, or
438 // malicious attack (probably due to some other bug).
439 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
440 << "message)";
441 break;
442 case ERROR_READ_UNKNOWN:
443 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
444 break;
445 case ERROR_WRITE:
446 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
447 break;
448 }
449
450 error_ = true;
451 if (started_transport_.Try()) {
452 base::AutoLock locker(lock());
453 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
454 started_transport_.Release();
455
456 base::MessageLoop::current()->PostTask(
457 FROM_HERE,
458 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
459 channel_ = nullptr;
460 } else {
461 // We must be waiting to call ReleaseHandle. It will call Shutdown.
462 }
463 }
464
465 void DataPipeConsumerDispatcher::SerializeInternal() {
466 // need to stop watching handle immediately, even tho not on IO thread, so
467 // that other messages aren't read after this.
468 if (channel_) {
469 serialized_platform_handle_ =
470 channel_->ReleaseHandle(&serialized_read_buffer_);
471
472 channel_ = nullptr;
473 serialized_ = true;
474 }
475 }
476
477 } // namespace edk
478 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698