OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/logging.h" | |
9 #include "base/message_loop/message_loop.h" | |
10 #include "mojo/edk/embedder/embedder_internal.h" | |
11 #include "mojo/edk/system/configuration.h" | |
12 #include "mojo/edk/system/message_in_transit.h" | |
13 #include "mojo/edk/system/options_validation.h" | |
14 #include "mojo/edk/system/transport_data.h" | |
15 | |
16 namespace mojo { | |
17 namespace edk { | |
18 | |
19 // TODO(jam): do more tests on using channel on same thread if it supports it ( | |
20 // i.e. with USE_CHROME_EDK and Windows). Also see ipc_channel_mojo.cc | |
21 bool g_use_channel_on_io_thread_only = true; | |
22 | |
23 const size_t kInvalidMessagePipeHandleIndex = static_cast<size_t>(-1); | |
24 | |
25 struct MOJO_ALIGNAS(8) SerializedMessagePipeHandleDispatcher { | |
26 size_t platform_handle_index; // (Or |kInvalidMessagePipeHandleIndex|.) | |
27 size_t read_buffer_size; // any bytes after this are serialized messages | |
28 }; | |
29 | |
30 // MessagePipeDispatcher ------------------------------------------------------- | |
31 | |
32 const MojoCreateMessagePipeOptions | |
33 MessagePipeDispatcher::kDefaultCreateOptions = { | |
34 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)), | |
35 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE}; | |
36 | |
37 MojoResult MessagePipeDispatcher::ValidateCreateOptions( | |
38 const MojoCreateMessagePipeOptions* in_options, | |
39 MojoCreateMessagePipeOptions* out_options) { | |
40 const MojoCreateMessagePipeOptionsFlags kKnownFlags = | |
41 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE; | |
42 | |
43 *out_options = kDefaultCreateOptions; | |
44 if (!in_options) | |
45 return MOJO_RESULT_OK; | |
46 | |
47 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options); | |
48 if (!reader.is_valid()) | |
49 return MOJO_RESULT_INVALID_ARGUMENT; | |
50 | |
51 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader)) | |
52 return MOJO_RESULT_OK; | |
53 if ((reader.options().flags & ~kKnownFlags)) | |
54 return MOJO_RESULT_UNIMPLEMENTED; | |
55 out_options->flags = reader.options().flags; | |
56 | |
57 // Checks for fields beyond |flags|: | |
58 | |
59 // (Nothing here yet.) | |
60 | |
61 return MOJO_RESULT_OK; | |
62 } | |
63 | |
64 void MessagePipeDispatcher::Init(ScopedPlatformHandle message_pipe) { | |
65 InitWithReadBuffer(message_pipe.Pass(), nullptr, 0); | |
66 } | |
67 | |
68 void MessagePipeDispatcher::InitWithReadBuffer( | |
69 ScopedPlatformHandle message_pipe, | |
70 char* data, | |
71 size_t size) { | |
72 if (message_pipe.get().is_valid()) { | |
73 channel_ = RawChannel::Create(message_pipe.Pass()); | |
74 | |
75 // TODO(jam): It's probably cleaner to pass this in Init call. | |
76 if (size) | |
77 channel_->SetInitialReadBufferData(data, size); | |
78 if (g_use_channel_on_io_thread_only) { | |
79 internal::g_io_thread_task_runner->PostTask( | |
80 FROM_HERE, base::Bind(&MessagePipeDispatcher::InitOnIO, this)); | |
81 } else { | |
82 InitOnIO(); | |
83 } | |
84 // TODO(jam): optimize for when running on IO thread? | |
85 } | |
86 } | |
87 | |
88 void MessagePipeDispatcher::InitOnIO() { | |
89 base::AutoLock locker(lock()); | |
90 calling_init_ = true; | |
91 if (channel_) | |
92 channel_->Init(this); | |
93 calling_init_ = false; | |
94 } | |
95 | |
96 void MessagePipeDispatcher::CloseOnIO() { | |
97 base::AutoLock locker(lock()); | |
98 | |
99 if (channel_) { | |
100 channel_->Shutdown(); | |
101 channel_ = nullptr; | |
102 } | |
103 } | |
104 | |
105 Dispatcher::Type MessagePipeDispatcher::GetType() const { | |
106 return Type::MESSAGE_PIPE; | |
107 } | |
108 | |
109 // TODO(jam): this is copied from RawChannelWin till I figure out what's the | |
110 // best way we want to share this. Need to also consider posix which does | |
111 // require access to the RawChannel. | |
112 // Since this is used for serialization of messages read/written to a MP that | |
113 // aren't consumed by Mojo primitives yet, there could be an unbounded number of | |
114 // them when a MP is being sent. As a result, even for POSIX we will probably | |
115 // want to send the handles to the shell process and exchange them for tokens | |
116 // (since we can be sure that the shell will respond to our IPCs, compared to | |
117 // the other end where we're sending the MP to, which may not be reading...). | |
118 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | |
119 size_t num_platform_handles, | |
120 const void* platform_handle_table) { | |
121 // TODO(jam): this code will have to be updated once it's used in a sandbox | |
122 // and the receiving process doesn't have duplicate permission for the | |
123 // receiver. Once there's a broker and we have a connection to it (possibly | |
124 // through ConnectionManager), then we can make a sync IPC to it here to get a | |
125 // token for this handle, and it will duplicate the handle to is process. Then | |
126 // we pass the token to the receiver, which will then make a sync call to the | |
127 // broker to get a duplicated handle. This will also allow us to avoid leaks | |
128 // of the handle if the receiver dies, since the broker can notice that. | |
129 DCHECK_GT(num_platform_handles, 0u); | |
130 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector()); | |
131 | |
132 #if defined(OS_WIN) | |
133 const char* serialization_data = | |
134 static_cast<const char*>(platform_handle_table); | |
135 for (size_t i = 0; i < num_platform_handles; i++) { | |
136 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); | |
137 serialization_data += sizeof(DWORD); | |
138 HANDLE source_handle = *reinterpret_cast<const HANDLE*>(serialization_data); | |
139 serialization_data += sizeof(HANDLE); | |
140 base::Process sender = | |
141 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); | |
142 DCHECK(sender.IsValid()); | |
143 HANDLE target_handle = NULL; | |
144 BOOL dup_result = | |
145 DuplicateHandle(sender.Handle(), source_handle, | |
146 base::GetCurrentProcessHandle(), &target_handle, 0, | |
147 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); | |
148 DCHECK(dup_result); | |
149 rv->push_back(PlatformHandle(target_handle)); | |
150 } | |
151 #else | |
152 NOTREACHED() << "TODO(jam): implement"; | |
153 #endif | |
154 return rv.Pass(); | |
155 } | |
156 | |
157 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize( | |
158 const void* source, | |
159 size_t size, | |
160 PlatformHandleVector* platform_handles) { | |
161 const SerializedMessagePipeHandleDispatcher* serialization = | |
162 static_cast<const SerializedMessagePipeHandleDispatcher*>(source); | |
163 size_t platform_handle_index = serialization->platform_handle_index; | |
164 | |
165 | |
166 // Starts off invalid, which is what we want. | |
167 PlatformHandle platform_handle; | |
168 | |
169 if (platform_handle_index != kInvalidMessagePipeHandleIndex) { | |
170 if (!platform_handles || | |
171 platform_handle_index >= platform_handles->size()) { | |
172 LOG(ERROR) | |
173 << "Invalid serialized platform handle dispatcher (missing handles)"; | |
174 return nullptr; | |
175 } | |
176 | |
177 // We take ownership of the handle, so we have to invalidate the one in | |
178 // |platform_handles|. | |
179 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); | |
180 } | |
181 | |
182 // TODO(jam): temporary until we send message_queue_ via shared memory. | |
183 size -= sizeof(SerializedMessagePipeHandleDispatcher); | |
184 const char* messages = static_cast<const char*>(source); | |
185 messages += sizeof(SerializedMessagePipeHandleDispatcher); | |
186 | |
187 char* initial_read_data = nullptr; | |
188 size_t initial_read_size = 0; | |
189 | |
190 if (serialization->read_buffer_size) { | |
191 initial_read_data = const_cast<char*>(messages); | |
192 initial_read_size = serialization->read_buffer_size; | |
193 | |
194 messages += initial_read_size; | |
195 size -= initial_read_size; | |
196 } | |
197 | |
198 scoped_refptr<MessagePipeDispatcher> rv( | |
199 Create(MessagePipeDispatcher::kDefaultCreateOptions)); | |
200 rv->InitWithReadBuffer( | |
201 ScopedPlatformHandle(platform_handle), | |
202 initial_read_data, initial_read_size); | |
203 | |
204 while (size) { | |
205 size_t message_size; | |
206 CHECK(MessageInTransit::GetNextMessageSize( | |
207 messages, size, &message_size)); | |
208 MessageInTransit::View message_view(message_size, messages); | |
209 size -= message_size; | |
210 messages += message_size; | |
211 | |
212 // TODO(jam): Copied below from RawChannelWin. See commment above | |
213 // GetReadPlatformHandles. | |
214 ScopedPlatformHandleVectorPtr platform_handles; | |
brucedawson
2015/10/05 16:50:26
This shadows the platform_handles function paramet
| |
215 if (message_view.transport_data_buffer()) { | |
216 size_t num_platform_handles; | |
217 const void* platform_handle_table; | |
218 TransportData::GetPlatformHandleTable( | |
219 message_view.transport_data_buffer(), &num_platform_handles, | |
220 &platform_handle_table); | |
221 | |
222 if (num_platform_handles > 0) { | |
223 platform_handles = | |
224 GetReadPlatformHandles(num_platform_handles, | |
225 platform_handle_table).Pass(); | |
226 if (!platform_handles) { | |
227 LOG(ERROR) << "Invalid number of platform handles received"; | |
228 return nullptr; | |
229 } | |
230 } | |
231 } | |
232 | |
233 // TODO(jam): Copied below from RawChannelWin. See commment above | |
234 // GetReadPlatformHandles. | |
235 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
236 if (message_view.transport_data_buffer_size() > 0) { | |
237 DCHECK(message_view.transport_data_buffer()); | |
238 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
239 message_view.transport_data_buffer(), | |
240 message_view.transport_data_buffer_size(), platform_handles.Pass())); | |
241 } | |
242 | |
243 rv->message_queue_.AddMessage(message.Pass()); | |
244 } | |
245 | |
246 return rv; | |
247 } | |
248 | |
249 MessagePipeDispatcher::MessagePipeDispatcher() | |
250 : channel_(nullptr), | |
251 serialized_(false), | |
252 calling_init_(false), | |
253 error_(false) { | |
254 } | |
255 | |
256 MessagePipeDispatcher::~MessagePipeDispatcher() { | |
257 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. | |
258 DCHECK(!channel_); | |
259 } | |
260 | |
261 void MessagePipeDispatcher::CancelAllAwakablesNoLock() { | |
262 lock().AssertAcquired(); | |
263 awakable_list_.CancelAll(); | |
264 } | |
265 | |
266 void MessagePipeDispatcher::CloseImplNoLock() { | |
267 lock().AssertAcquired(); | |
268 if (g_use_channel_on_io_thread_only) { | |
269 internal::g_io_thread_task_runner->PostTask( | |
270 FROM_HERE, base::Bind(&MessagePipeDispatcher::CloseOnIO, this)); | |
271 } else { | |
272 CloseOnIO(); | |
273 } | |
274 } | |
275 | |
276 void MessagePipeDispatcher::SerializeInternal() { | |
277 // We need to stop watching handle immediately, even tho not on IO thread, so | |
278 // that other messages aren't read after this. | |
279 { | |
280 if (channel_) { | |
281 serialized_platform_handle_ = | |
282 channel_->ReleaseHandle(&serialized_read_buffer_).release(); | |
283 channel_ = nullptr; | |
284 } else { | |
285 // It's valid that the other side wrote some data and closed its end. | |
286 } | |
287 } | |
288 | |
289 DCHECK(serialized_message_queue_.empty()); | |
290 // see comment in method below, this is only temporary till we implement a | |
291 // solution with shared buffer | |
292 while (!message_queue_.IsEmpty()) { | |
293 scoped_ptr<MessageInTransit> message = message_queue_.GetMessage(); | |
294 size_t cur_size = serialized_message_queue_.size(); | |
295 | |
296 | |
297 // When MojoWriteMessage is called, the MessageInTransit doesn't have | |
298 // dispatchers set and CreateEquivaent... is called since the dispatchers | |
299 // can be referenced by others. here dispatchers aren't referenced by | |
300 // others, but rawchannel can still call to them. so since we dont call | |
301 // createequiv, manually call TransportStarted and TransportEnd. | |
302 DispatcherVector dispatchers; | |
303 if (message->has_dispatchers()) | |
304 dispatchers = *message->dispatchers(); | |
305 for (size_t i = 0; i < dispatchers.size(); ++i) | |
306 dispatchers[i]->TransportStarted(); | |
307 | |
308 // TODO(jam): this handling for dispatchers only works on windows where we | |
309 // send transportdata as bytes instead of as parameters to sendmsg. | |
310 message->SerializeAndCloseDispatchers(); | |
311 // cont'd below | |
312 | |
313 | |
314 size_t main_buffer_size = message->main_buffer_size(); | |
315 size_t transport_data_buffer_size = message->transport_data() ? | |
316 message->transport_data()->buffer_size() : 0; | |
317 size_t total_size = message->total_size(); | |
318 | |
319 serialized_message_queue_.resize(cur_size + total_size); | |
320 memcpy(&serialized_message_queue_[cur_size], message->main_buffer(), | |
321 main_buffer_size); | |
322 | |
323 // cont'd | |
324 if (transport_data_buffer_size != 0) { | |
325 #if defined(OS_WIN) | |
326 // TODO(jam): copied from RawChannelWin::WriteNoLock( | |
327 if (RawChannel::GetSerializedPlatformHandleSize()) { | |
328 char* serialization_data = | |
329 static_cast<char*>(message->transport_data()->buffer()) + | |
330 message->transport_data()->platform_handle_table_offset(); | |
331 PlatformHandleVector* all_platform_handles = | |
332 message->transport_data()->platform_handles(); | |
333 if (all_platform_handles) { | |
334 DWORD current_process_id = base::GetCurrentProcId(); | |
335 for (size_t i = 0; i < all_platform_handles->size(); i++) { | |
336 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; | |
337 serialization_data += sizeof(DWORD); | |
338 *reinterpret_cast<HANDLE*>(serialization_data) = | |
339 all_platform_handles->at(i).handle; | |
340 serialization_data += sizeof(HANDLE); | |
341 all_platform_handles->at(i) = PlatformHandle(); | |
342 } | |
343 } | |
344 } | |
345 | |
346 memcpy(&serialized_message_queue_[ | |
347 cur_size + total_size - transport_data_buffer_size], | |
348 message->transport_data()->buffer(), transport_data_buffer_size); | |
349 #else | |
350 NOTREACHED() << "TODO(jam) implement"; | |
351 #endif | |
352 } | |
353 | |
354 for (size_t i = 0; i < dispatchers.size(); ++i) | |
355 dispatchers[i]->TransportEnded(); | |
356 } | |
357 | |
358 serialized_ = true; | |
359 } | |
360 | |
361 scoped_refptr<Dispatcher> | |
362 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
363 lock().AssertAcquired(); | |
364 | |
365 SerializeInternal(); | |
366 | |
367 // TODO(vtl): Currently, there are no options, so we just use | |
368 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options | |
369 // too. | |
370 scoped_refptr<MessagePipeDispatcher> rv = Create(kDefaultCreateOptions); | |
371 rv->serialized_platform_handle_ = serialized_platform_handle_; | |
372 serialized_platform_handle_ = PlatformHandle(); | |
373 serialized_message_queue_.swap(rv->serialized_message_queue_); | |
374 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
375 rv->serialized_ = true; | |
376 return scoped_refptr<Dispatcher>(rv.get()); | |
377 } | |
378 | |
379 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock( | |
380 const void* bytes, | |
381 uint32_t num_bytes, | |
382 std::vector<DispatcherTransport>* transports, | |
383 MojoWriteMessageFlags flags) { | |
384 | |
385 DCHECK(!transports || | |
386 (transports->size() > 0 && | |
387 transports->size() <= GetConfiguration().max_message_num_handles)); | |
388 | |
389 lock().AssertAcquired(); | |
390 | |
391 if (!channel_) { | |
392 DCHECK(error_); | |
393 return MOJO_RESULT_FAILED_PRECONDITION; | |
394 } | |
395 | |
396 if (num_bytes > GetConfiguration().max_message_num_bytes) | |
397 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
398 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
399 MessageInTransit::Type::MESSAGE, num_bytes, bytes)); | |
400 if (transports) { | |
401 MojoResult result = AttachTransportsNoLock(message.get(), transports); | |
402 if (result != MOJO_RESULT_OK) | |
403 return result; | |
404 } | |
405 | |
406 message->SerializeAndCloseDispatchers(); | |
407 channel_->WriteMessage(message.Pass()); | |
408 | |
409 return MOJO_RESULT_OK; | |
410 } | |
411 | |
412 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock( | |
413 void* bytes, | |
414 uint32_t* num_bytes, | |
415 DispatcherVector* dispatchers, | |
416 uint32_t* num_dispatchers, | |
417 MojoReadMessageFlags flags) { | |
418 lock().AssertAcquired(); | |
419 DCHECK(!dispatchers || dispatchers->empty()); | |
420 | |
421 const uint32_t max_bytes = !num_bytes ? 0 : *num_bytes; | |
422 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; | |
423 | |
424 if (message_queue_.IsEmpty()) { | |
425 return error_ ? MOJO_RESULT_FAILED_PRECONDITION | |
426 : MOJO_RESULT_SHOULD_WAIT; | |
427 } | |
428 | |
429 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop | |
430 // and release the lock immediately. | |
431 bool enough_space = true; | |
432 MessageInTransit* message = message_queue_.PeekMessage(); | |
433 if (num_bytes) | |
434 *num_bytes = message->num_bytes(); | |
435 if (message->num_bytes() <= max_bytes) | |
436 memcpy(bytes, message->bytes(), message->num_bytes()); | |
437 else | |
438 enough_space = false; | |
439 | |
440 if (DispatcherVector* queued_dispatchers = message->dispatchers()) { | |
441 if (num_dispatchers) | |
442 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); | |
443 if (enough_space) { | |
444 if (queued_dispatchers->empty()) { | |
445 // Nothing to do. | |
446 } else if (queued_dispatchers->size() <= max_num_dispatchers) { | |
447 DCHECK(dispatchers); | |
448 dispatchers->swap(*queued_dispatchers); | |
449 } else { | |
450 enough_space = false; | |
451 } | |
452 } | |
453 } else { | |
454 if (num_dispatchers) | |
455 *num_dispatchers = 0; | |
456 } | |
457 | |
458 message = nullptr; | |
459 | |
460 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { | |
461 message_queue_.DiscardMessage(); | |
462 | |
463 // Now it's empty, thus no longer readable. | |
464 if (message_queue_.IsEmpty()) { | |
465 // It's currently not possible to wait for non-readability, but we should | |
466 // do the state change anyway. | |
467 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
468 } | |
469 } | |
470 | |
471 if (!enough_space) | |
472 return MOJO_RESULT_RESOURCE_EXHAUSTED; | |
473 | |
474 return MOJO_RESULT_OK; | |
475 } | |
476 | |
477 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock() | |
478 const { | |
479 lock().AssertAcquired(); | |
480 | |
481 HandleSignalsState rv; | |
482 if (!message_queue_.IsEmpty()) { | |
483 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
484 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
485 } | |
486 if (!error_) { | |
487 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
488 rv.satisfiable_signals |= | |
489 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE; | |
490 } else { | |
491 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
492 } | |
493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
494 return rv; | |
495 } | |
496 | |
497 MojoResult MessagePipeDispatcher::AddAwakableImplNoLock( | |
498 Awakable* awakable, | |
499 MojoHandleSignals signals, | |
500 uint32_t context, | |
501 HandleSignalsState* signals_state) { | |
502 lock().AssertAcquired(); | |
503 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | |
504 if (state.satisfies(signals)) { | |
505 if (signals_state) | |
506 *signals_state = state; | |
507 return MOJO_RESULT_ALREADY_EXISTS; | |
508 } | |
509 if (!state.can_satisfy(signals)) { | |
510 if (signals_state) | |
511 *signals_state = state; | |
512 return MOJO_RESULT_FAILED_PRECONDITION; | |
513 } | |
514 | |
515 awakable_list_.Add(awakable, signals, context); | |
516 return MOJO_RESULT_OK; | |
517 } | |
518 | |
519 void MessagePipeDispatcher::RemoveAwakableImplNoLock( | |
520 Awakable* awakable, | |
521 HandleSignalsState* signals_state) { | |
522 lock().AssertAcquired(); | |
523 | |
524 awakable_list_.Remove(awakable); | |
525 if (signals_state) | |
526 *signals_state = GetHandleSignalsStateImplNoLock(); | |
527 } | |
528 | |
529 void MessagePipeDispatcher::StartSerializeImplNoLock( | |
530 size_t* max_size, | |
531 size_t* max_platform_handles) { | |
532 if (!serialized_) | |
533 SerializeInternal(); | |
534 | |
535 *max_platform_handles = serialized_platform_handle_.is_valid() ? 1 : 0; | |
536 | |
537 DCHECK_EQ(serialized_message_queue_.size() % | |
538 MessageInTransit::kMessageAlignment, 0U); | |
539 *max_size = sizeof(SerializedMessagePipeHandleDispatcher) + | |
540 serialized_message_queue_.size() + | |
541 serialized_read_buffer_.size(); | |
542 | |
543 DCHECK_LE(*max_size, TransportData::kMaxSerializedDispatcherSize); | |
544 } | |
545 | |
546 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock( | |
547 void* destination, | |
548 size_t* actual_size, | |
549 PlatformHandleVector* platform_handles) { | |
550 CloseImplNoLock(); | |
551 SerializedMessagePipeHandleDispatcher* serialization = | |
552 static_cast<SerializedMessagePipeHandleDispatcher*>(destination); | |
553 if (serialized_platform_handle_.is_valid()) { | |
554 serialization->platform_handle_index = platform_handles->size(); | |
555 platform_handles->push_back(serialized_platform_handle_); | |
556 } else { | |
557 serialization->platform_handle_index = kInvalidMessagePipeHandleIndex; | |
558 } | |
559 serialization->read_buffer_size = serialized_read_buffer_.size(); | |
560 | |
561 char* destination_char = static_cast<char*>(destination); | |
562 destination_char += sizeof(SerializedMessagePipeHandleDispatcher); | |
563 | |
564 if (!serialized_read_buffer_.empty()) { | |
565 memcpy(destination_char, &serialized_read_buffer_[0], | |
566 serialized_read_buffer_.size()); | |
567 destination_char += serialized_read_buffer_.size(); | |
568 } | |
569 | |
570 | |
571 if (!serialized_message_queue_.empty()) { | |
572 memcpy(destination_char, | |
573 &serialized_message_queue_[0], | |
574 serialized_message_queue_.size()); | |
575 } | |
576 | |
577 *actual_size = | |
578 sizeof(SerializedMessagePipeHandleDispatcher) + | |
579 serialized_message_queue_.size() + | |
580 serialized_read_buffer_.size(); | |
581 | |
582 return true; | |
583 } | |
584 | |
585 void MessagePipeDispatcher::TransportStarted() { | |
586 started_transport_.Acquire(); | |
587 } | |
588 | |
589 void MessagePipeDispatcher::TransportEnded() { | |
590 started_transport_.Release(); | |
591 | |
592 base::AutoLock locker(lock()); | |
593 | |
594 // If transporting of MPD failed, we might have got more data and didn't | |
595 // awake for. | |
596 // TODO(jam): should we care about only alerting if it was empty before | |
597 // TransportStarted? | |
598 if (!message_queue_.IsEmpty()) | |
599 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
600 } | |
601 | |
602 void MessagePipeDispatcher::OnReadMessage( | |
603 const MessageInTransit::View& message_view, | |
604 ScopedPlatformHandleVectorPtr platform_handles) { | |
605 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view)); | |
606 if (message_view.transport_data_buffer_size() > 0) { | |
607 DCHECK(message_view.transport_data_buffer()); | |
608 message->SetDispatchers(TransportData::DeserializeDispatchers( | |
609 message_view.transport_data_buffer(), | |
610 message_view.transport_data_buffer_size(), platform_handles.Pass())); | |
611 } | |
612 | |
613 if (started_transport_.Try()) { | |
614 // we're not in the middle of being sent | |
615 | |
616 // Can get synchronously called back in Init if there was initial data. | |
617 scoped_ptr<base::AutoLock> locker; | |
618 if (!calling_init_) { | |
619 locker.reset(new base::AutoLock(lock())); | |
620 } | |
621 | |
622 bool was_empty = message_queue_.IsEmpty(); | |
623 message_queue_.AddMessage(message.Pass()); | |
624 if (was_empty) | |
625 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
626 | |
627 started_transport_.Release(); | |
628 } else { | |
629 | |
630 // if RawChannel is calling OnRead, that means it has its read_lock_ | |
631 // acquired. that means StartSerialize can't be accessing message queue as | |
632 // it waits on releasehandle first which acquires readlock_! | |
633 message_queue_.AddMessage(message.Pass()); | |
634 } | |
635 } | |
636 | |
637 void MessagePipeDispatcher::OnError(Error error) { | |
638 switch (error) { | |
639 case ERROR_READ_SHUTDOWN: | |
640 // The other side was cleanly closed, so this isn't actually an error. | |
641 DVLOG(1) << "MessagePipeDispatcher read error (shutdown)"; | |
642 break; | |
643 case ERROR_READ_BROKEN: | |
644 LOG(ERROR) << "MessagePipeDispatcher read error (connection broken)"; | |
645 break; | |
646 case ERROR_READ_BAD_MESSAGE: | |
647 // Receiving a bad message means either a bug, data corruption, or | |
648 // malicious attack (probably due to some other bug). | |
649 LOG(ERROR) << "MessagePipeDispatcher read error (received bad message)"; | |
650 break; | |
651 case ERROR_READ_UNKNOWN: | |
652 LOG(ERROR) << "MessagePipeDispatcher read error (unknown)"; | |
653 break; | |
654 case ERROR_WRITE: | |
655 // Write errors are slightly notable: they probably shouldn't happen under | |
656 // normal operation (but maybe the other side crashed). | |
657 LOG(WARNING) << "MessagePipeDispatcher write error"; | |
658 break; | |
659 } | |
660 | |
661 error_ = true; | |
662 if (started_transport_.Try()) { | |
663 base::AutoLock locker(lock()); | |
664 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
665 | |
666 base::MessageLoop::current()->PostTask( | |
667 FROM_HERE, | |
668 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); | |
669 channel_ = nullptr; | |
670 started_transport_.Release(); | |
671 } else { | |
672 // We must be waiting to call ReleaseHandle. It will call Shutdown. | |
673 } | |
674 } | |
675 | |
676 MojoResult MessagePipeDispatcher::AttachTransportsNoLock( | |
677 MessageInTransit* message, | |
678 std::vector<DispatcherTransport>* transports) { | |
679 DCHECK(!message->has_dispatchers()); | |
680 | |
681 // You're not allowed to send either handle to a message pipe over the message | |
682 // pipe, so check for this. (The case of trying to write a handle to itself is | |
683 // taken care of by |Core|. That case kind of makes sense, but leads to | |
684 // complications if, e.g., both sides try to do the same thing with their | |
685 // respective handles simultaneously. The other case, of trying to write the | |
686 // peer handle to a handle, doesn't make sense -- since no handle will be | |
687 // available to read the message from.) | |
688 for (size_t i = 0; i < transports->size(); i++) { | |
689 if (!(*transports)[i].is_valid()) | |
690 continue; | |
691 if ((*transports)[i].GetType() == Dispatcher::Type::MESSAGE_PIPE) { | |
692 MessagePipeDispatcher* mp = | |
693 static_cast<MessagePipeDispatcher*>(((*transports)[i]).dispatcher()); | |
694 if (channel_ && mp->channel_ && channel_->IsOtherEndOf(mp->channel_)) { | |
695 // The other case should have been disallowed by |Core|. (Note: |port| | |
696 // is the peer port of the handle given to |WriteMessage()|.) | |
697 return MOJO_RESULT_INVALID_ARGUMENT; | |
698 } | |
699 } | |
700 } | |
701 | |
702 // Clone the dispatchers and attach them to the message. (This must be done as | |
703 // a separate loop, since we want to leave the dispatchers alone on failure.) | |
704 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector()); | |
705 dispatchers->reserve(transports->size()); | |
706 for (size_t i = 0; i < transports->size(); i++) { | |
707 if ((*transports)[i].is_valid()) { | |
708 dispatchers->push_back( | |
709 (*transports)[i].CreateEquivalentDispatcherAndClose()); | |
710 } else { | |
711 LOG(WARNING) << "Enqueueing null dispatcher"; | |
712 dispatchers->push_back(nullptr); | |
713 } | |
714 } | |
715 message->SetDispatchers(dispatchers.Pass()); | |
716 return MOJO_RESULT_OK; | |
717 } | |
718 | |
719 } // namespace edk | |
720 } // namespace mojo | |
OLD | NEW |