OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/raw_channel.h" | |
6 | |
7 #include <stddef.h> | |
8 #include <stdint.h> | |
9 #include <string.h> | |
10 #include <algorithm> | |
11 #include <utility> | |
12 | |
13 #include "base/bind.h" | |
14 #include "base/location.h" | |
15 #include "base/logging.h" | |
16 #include "base/message_loop/message_loop.h" | |
17 #include "mojo/edk/embedder/embedder_internal.h" | |
18 #include "mojo/edk/system/configuration.h" | |
19 #include "mojo/edk/system/message_in_transit.h" | |
20 #include "mojo/edk/system/transport_data.h" | |
21 | |
22 namespace mojo { | |
23 namespace edk { | |
24 | |
25 const size_t kReadSize = 4096; | |
26 | |
27 // RawChannel::ReadBuffer ------------------------------------------------------ | |
28 | |
29 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | |
30 } | |
31 | |
32 RawChannel::ReadBuffer::~ReadBuffer() { | |
33 } | |
34 | |
35 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | |
36 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | |
37 *addr = &buffer_[0] + num_valid_bytes_; | |
38 *size = kReadSize; | |
39 } | |
40 | |
41 // RawChannel::WriteBuffer ----------------------------------------------------- | |
42 | |
43 RawChannel::WriteBuffer::WriteBuffer() | |
44 : serialized_platform_handle_size_(0), | |
45 platform_handles_offset_(0), | |
46 data_offset_(0) { | |
47 } | |
48 | |
49 RawChannel::WriteBuffer::~WriteBuffer() { | |
50 message_queue_.Clear(); | |
51 } | |
52 | |
53 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | |
54 if (message_queue_.IsEmpty()) | |
55 return false; | |
56 | |
57 const TransportData* transport_data = | |
58 message_queue_.PeekMessage()->transport_data(); | |
59 if (!transport_data) | |
60 return false; | |
61 | |
62 const PlatformHandleVector* all_platform_handles = | |
63 transport_data->platform_handles(); | |
64 if (!all_platform_handles) { | |
65 DCHECK_EQ(platform_handles_offset_, 0u); | |
66 return false; | |
67 } | |
68 if (platform_handles_offset_ >= all_platform_handles->size()) { | |
69 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); | |
70 return false; | |
71 } | |
72 | |
73 return true; | |
74 } | |
75 | |
76 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( | |
77 size_t* num_platform_handles, | |
78 PlatformHandle** platform_handles, | |
79 void** serialization_data) { | |
80 DCHECK(HavePlatformHandlesToSend()); | |
81 | |
82 MessageInTransit* message = message_queue_.PeekMessage(); | |
83 TransportData* transport_data = message->transport_data(); | |
84 PlatformHandleVector* all_platform_handles = | |
85 transport_data->platform_handles(); | |
86 *num_platform_handles = | |
87 all_platform_handles->size() - platform_handles_offset_; | |
88 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; | |
89 | |
90 if (serialized_platform_handle_size_ > 0) { | |
91 size_t serialization_data_offset = | |
92 transport_data->platform_handle_table_offset(); | |
93 serialization_data_offset += | |
94 platform_handles_offset_ * serialized_platform_handle_size_; | |
95 *serialization_data = static_cast<char*>(transport_data->buffer()) + | |
96 serialization_data_offset; | |
97 } else { | |
98 *serialization_data = nullptr; | |
99 } | |
100 } | |
101 | |
102 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) { | |
103 buffers->clear(); | |
104 | |
105 if (message_queue_.IsEmpty()) | |
106 return; | |
107 | |
108 const MessageInTransit* message = message_queue_.PeekMessage(); | |
109 if (message->type() == MessageInTransit::Type::RAW_MESSAGE) { | |
110 // These are already-serialized messages so we don't want to write another | |
111 // header as they include that. | |
112 if (data_offset_ == 0) { | |
113 size_t header_size = message->total_size() - message->num_bytes(); | |
114 data_offset_ = header_size; | |
115 } | |
116 } | |
117 | |
118 DCHECK_LT(data_offset_, message->total_size()); | |
119 size_t bytes_to_write = message->total_size() - data_offset_; | |
120 | |
121 size_t transport_data_buffer_size = | |
122 message->transport_data() ? message->transport_data()->buffer_size() : 0; | |
123 | |
124 if (!transport_data_buffer_size) { | |
125 // Only write from the main buffer. | |
126 DCHECK_LT(data_offset_, message->main_buffer_size()); | |
127 DCHECK_LE(bytes_to_write, message->main_buffer_size()); | |
128 Buffer buffer = { | |
129 static_cast<const char*>(message->main_buffer()) + data_offset_, | |
130 bytes_to_write}; | |
131 | |
132 buffers->push_back(buffer); | |
133 return; | |
134 } | |
135 | |
136 if (data_offset_ >= message->main_buffer_size()) { | |
137 // Only write from the transport data buffer. | |
138 DCHECK_LT(data_offset_ - message->main_buffer_size(), | |
139 transport_data_buffer_size); | |
140 DCHECK_LE(bytes_to_write, transport_data_buffer_size); | |
141 Buffer buffer = { | |
142 static_cast<const char*>(message->transport_data()->buffer()) + | |
143 (data_offset_ - message->main_buffer_size()), | |
144 bytes_to_write}; | |
145 | |
146 buffers->push_back(buffer); | |
147 return; | |
148 } | |
149 | |
150 // TODO(vtl): We could actually send out buffers from multiple messages, with | |
151 // the "stopping" condition being reaching a message with platform handles | |
152 // attached. | |
153 | |
154 // Write from both buffers. | |
155 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + | |
156 transport_data_buffer_size); | |
157 Buffer buffer1 = { | |
158 static_cast<const char*>(message->main_buffer()) + data_offset_, | |
159 message->main_buffer_size() - data_offset_}; | |
160 buffers->push_back(buffer1); | |
161 Buffer buffer2 = { | |
162 static_cast<const char*>(message->transport_data()->buffer()), | |
163 transport_data_buffer_size}; | |
164 buffers->push_back(buffer2); | |
165 } | |
166 | |
167 // RawChannel ------------------------------------------------------------------ | |
168 | |
169 RawChannel::RawChannel() | |
170 : delegate_(nullptr), | |
171 error_occurred_(false), | |
172 calling_delegate_(false), | |
173 write_ready_(false), | |
174 write_stopped_(false), | |
175 pending_write_error_(false), | |
176 initialized_(false), | |
177 weak_ptr_factory_(this) { | |
178 read_buffer_.reset(new ReadBuffer); | |
179 write_buffer_.reset(new WriteBuffer()); | |
180 } | |
181 | |
182 RawChannel::~RawChannel() { | |
183 DCHECK(!read_buffer_); | |
184 DCHECK(!write_buffer_); | |
185 } | |
186 | |
187 void RawChannel::Init(Delegate* delegate) { | |
188 DCHECK(delegate); | |
189 | |
190 base::AutoLock read_locker(read_lock_); | |
191 // Solves race where initialiing on io thread while main thread is serializing | |
192 // this channel and releases handle. | |
193 base::AutoLock locker(write_lock_); | |
194 | |
195 DCHECK(!delegate_); | |
196 delegate_ = delegate; | |
197 | |
198 if (read_buffer_->num_valid_bytes_ || | |
199 !write_buffer_->message_queue_.IsEmpty()) { | |
200 LazyInitialize(); | |
201 } | |
202 } | |
203 | |
204 void RawChannel::EnsureLazyInitialized() { | |
205 { | |
206 base::AutoLock locker(write_lock_); | |
207 if (initialized_) | |
208 return; | |
209 } | |
210 | |
211 internal::g_io_thread_task_runner->PostTask( | |
212 FROM_HERE, | |
213 base::Bind(&RawChannel::LockAndCallLazyInitialize, | |
214 weak_ptr_factory_.GetWeakPtr())); | |
215 } | |
216 | |
217 void RawChannel::LockAndCallLazyInitialize() { | |
218 base::AutoLock read_locker(read_lock_); | |
219 base::AutoLock locker(write_lock_); | |
220 LazyInitialize(); | |
221 } | |
222 | |
223 void RawChannel::LazyInitialize() { | |
224 read_lock_.AssertAcquired(); | |
225 write_lock_.AssertAcquired(); | |
226 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
227 if (initialized_) | |
228 return; | |
229 initialized_ = true; | |
230 base::MessageLoop::current()->AddDestructionObserver(this); | |
231 | |
232 OnInit(); | |
233 | |
234 if (read_buffer_->num_valid_bytes_) { | |
235 // We had serialized read buffer data through SetSerializedData call. | |
236 // Make sure we read messages out of it now, otherwise the delegate won't | |
237 // get notified if no other data gets written to the pipe. | |
238 // Although this means that we can call back synchronously into the caller, | |
239 // that's easier than posting a task to do this. That is because if we post | |
240 // a task, a pending read could have started and we wouldn't be able to move | |
241 // the read buffer since it can be in use by the OS in an async operation. | |
242 bool did_dispatch_message = false; | |
243 bool stop_dispatching = false; | |
244 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
245 } | |
246 | |
247 IOResult io_result = ScheduleRead(); | |
248 if (io_result != IO_PENDING) { | |
249 // This will notify the delegate about the read failure. Although we're on | |
250 // the I/O thread, don't call it in the nested context. | |
251 internal::g_io_thread_task_runner->PostTask( | |
252 FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted, | |
253 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | |
254 } | |
255 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | |
256 // the delegate), not an initialization failure. | |
257 | |
258 write_ready_ = true; | |
259 write_buffer_->serialized_platform_handle_size_ = | |
260 GetSerializedPlatformHandleSize(); | |
261 if (!write_buffer_->message_queue_.IsEmpty()) | |
262 SendQueuedMessagesNoLock(); | |
263 } | |
264 | |
265 void RawChannel::Shutdown() { | |
266 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
267 | |
268 weak_ptr_factory_.InvalidateWeakPtrs(); | |
269 // Reset the delegate so that it won't receive further calls. | |
270 delegate_ = nullptr; | |
271 if (calling_delegate_) { | |
272 internal::g_io_thread_task_runner->PostTask( | |
273 FROM_HERE, | |
274 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
275 return; | |
276 } | |
277 | |
278 bool empty = false; | |
279 { | |
280 base::AutoLock locker(write_lock_); | |
281 empty = write_buffer_->message_queue_.IsEmpty(); | |
282 } | |
283 | |
284 // Normally, we want to flush any pending writes before shutting down. This | |
285 // doesn't apply when 1) we don't have a handle (for obvious reasons), | |
286 // 2) we have a read or write error before (doesn't matter which), or 3) when | |
287 // there are no pending messages to be written. | |
288 if (!IsHandleValid() || error_occurred_ || empty) { | |
289 { | |
290 base::AutoLock read_locker(read_lock_); | |
291 base::AutoLock locker(write_lock_); | |
292 OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_)); | |
293 if (initialized_) | |
294 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
295 } | |
296 | |
297 delete this; | |
298 return; | |
299 } | |
300 | |
301 base::AutoLock read_locker(read_lock_); | |
302 base::AutoLock locker(write_lock_); | |
303 DCHECK(read_buffer_->IsEmpty()) << | |
304 "RawChannel::Shutdown called but there is pending data to be read"; | |
305 | |
306 write_stopped_ = true; | |
307 } | |
308 | |
309 ScopedPlatformHandle RawChannel::ReleaseHandle( | |
310 std::vector<char>* serialized_read_buffer, | |
311 std::vector<char>* serialized_write_buffer, | |
312 std::vector<int>* serialized_read_fds, | |
313 std::vector<int>* serialized_write_fds, | |
314 bool* write_error) { | |
315 ScopedPlatformHandle rv; | |
316 *write_error = false; | |
317 { | |
318 base::AutoLock read_locker(read_lock_); | |
319 base::AutoLock locker(write_lock_); | |
320 rv = ReleaseHandleNoLock(serialized_read_buffer, | |
321 serialized_write_buffer, | |
322 serialized_read_fds, | |
323 serialized_write_fds, | |
324 write_error); | |
325 delegate_ = nullptr; | |
326 internal::g_io_thread_task_runner->PostTask( | |
327 FROM_HERE, | |
328 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
329 } | |
330 | |
331 return rv; | |
332 } | |
333 | |
334 // Reminder: This must be thread-safe. | |
335 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | |
336 DCHECK(message); | |
337 EnsureLazyInitialized(); | |
338 base::AutoLock locker(write_lock_); | |
339 if (write_stopped_) | |
340 return false; | |
341 | |
342 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); | |
343 EnqueueMessageNoLock(std::move(message)); | |
344 if (queue_was_empty && write_ready_) | |
345 return SendQueuedMessagesNoLock(); | |
346 | |
347 return true; | |
348 } | |
349 | |
350 bool RawChannel::SendQueuedMessagesNoLock() { | |
351 DCHECK_EQ(write_buffer_->data_offset_, 0u); | |
352 | |
353 size_t platform_handles_written = 0; | |
354 size_t bytes_written = 0; | |
355 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | |
356 if (io_result == IO_PENDING) | |
357 return true; | |
358 | |
359 bool result = OnWriteCompletedInternalNoLock( | |
360 io_result, platform_handles_written, bytes_written); | |
361 if (!result) { | |
362 // Even if we're on the I/O thread, don't call |OnError()| in the nested | |
363 // context. | |
364 pending_write_error_ = true; | |
365 internal::g_io_thread_task_runner->PostTask( | |
366 FROM_HERE, | |
367 base::Bind(&RawChannel::LockAndCallOnError, | |
368 weak_ptr_factory_.GetWeakPtr(), | |
369 Delegate::ERROR_WRITE)); | |
370 } | |
371 | |
372 return result; | |
373 } | |
374 | |
375 void RawChannel::SetSerializedData( | |
376 char* serialized_read_buffer, size_t serialized_read_buffer_size, | |
377 char* serialized_write_buffer, size_t serialized_write_buffer_size, | |
378 std::vector<int>* serialized_read_fds, | |
379 std::vector<int>* serialized_write_fds) { | |
380 base::AutoLock locker(read_lock_); | |
381 | |
382 #if defined(OS_POSIX) | |
383 SetSerializedFDs(serialized_read_fds, serialized_write_fds); | |
384 #endif | |
385 | |
386 if (serialized_read_buffer_size) { | |
387 // TODO(jam): copy power of 2 algorithm below? or share. | |
388 read_buffer_->buffer_.resize(serialized_read_buffer_size + kReadSize); | |
389 memcpy(&read_buffer_->buffer_[0], serialized_read_buffer, | |
390 serialized_read_buffer_size); | |
391 read_buffer_->num_valid_bytes_ = serialized_read_buffer_size; | |
392 } | |
393 | |
394 if (serialized_write_buffer_size) { | |
395 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
396 | |
397 uint32_t offset = 0; | |
398 while (offset < serialized_write_buffer_size) { | |
399 uint32_t message_num_bytes = | |
400 std::min(static_cast<uint32_t>(max_message_num_bytes), | |
401 static_cast<uint32_t>(serialized_write_buffer_size) - | |
402 offset); | |
403 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
404 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes, | |
405 static_cast<const char*>(serialized_write_buffer) + offset)); | |
406 write_buffer_->message_queue_.AddMessage(std::move(message)); | |
407 offset += message_num_bytes; | |
408 } | |
409 } | |
410 } | |
411 | |
412 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) { | |
413 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
414 read_lock_.AssertAcquired(); | |
415 // Keep reading data in a loop, and dispatch messages if enough data is | |
416 // received. Exit the loop if any of the following happens: | |
417 // - one or more messages were dispatched; | |
418 // - the last read failed, was a partial read or would block; | |
419 // - |Shutdown()| was called. | |
420 do { | |
421 switch (io_result) { | |
422 case IO_SUCCEEDED: | |
423 break; | |
424 case IO_FAILED_SHUTDOWN: | |
425 case IO_FAILED_BROKEN: | |
426 case IO_FAILED_UNKNOWN: | |
427 CallOnError(ReadIOResultToError(io_result)); | |
428 return; // |this| may have been destroyed in |CallOnError()|. | |
429 case IO_PENDING: | |
430 NOTREACHED(); | |
431 return; | |
432 } | |
433 | |
434 read_buffer_->num_valid_bytes_ += bytes_read; | |
435 | |
436 // Dispatch all the messages that we can. | |
437 bool did_dispatch_message = false; | |
438 bool stop_dispatching = false; | |
439 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
440 if (stop_dispatching) | |
441 return; | |
442 | |
443 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | |
444 kReadSize) { | |
445 // Use power-of-2 buffer sizes. | |
446 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | |
447 // maximum message size to whatever extent necessary). | |
448 // TODO(vtl): We may often be able to peek at the header and get the real | |
449 // required extra space (which may be much bigger than |kReadSize|). | |
450 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); | |
451 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) | |
452 new_size *= 2; | |
453 | |
454 // TODO(vtl): It's suboptimal to zero out the fresh memory. | |
455 read_buffer_->buffer_.resize(new_size, 0); | |
456 } | |
457 | |
458 // (1) If we dispatched any messages, stop reading for now (and let the | |
459 // message loop do its thing for another round). | |
460 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | |
461 // a single message. Risks: slower, more complex if we want to avoid lots of | |
462 // copying. ii. Keep reading until there's no more data and dispatch all the | |
463 // messages we can. Risks: starvation of other users of the message loop.) | |
464 // (2) If we didn't max out |kReadSize|, stop reading for now. | |
465 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | |
466 bytes_read = 0; | |
467 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | |
468 } while (io_result != IO_PENDING); | |
469 } | |
470 | |
471 void RawChannel::OnWriteCompletedNoLock(IOResult io_result, | |
472 size_t platform_handles_written, | |
473 size_t bytes_written) { | |
474 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
475 write_lock_.AssertAcquired(); | |
476 DCHECK_NE(io_result, IO_PENDING); | |
477 | |
478 bool did_fail = !OnWriteCompletedInternalNoLock( | |
479 io_result, platform_handles_written, bytes_written); | |
480 if (did_fail) { | |
481 // Don't want to call the delegate with the current callstack for two | |
482 // reasons: | |
483 // 1) We already have write_lock_ acquired, and calling the delegate means | |
484 // also acquiring the read lock. We need to acquire read and then write to | |
485 // avoid deadlocks. | |
486 // 2) We shouldn't call the delegate with write_lock acquired, since the | |
487 // delegate could be calling WriteMessage and that can cause deadlocks. | |
488 pending_write_error_ = true; | |
489 internal::g_io_thread_task_runner->PostTask( | |
490 FROM_HERE, | |
491 base::Bind(&RawChannel::LockAndCallOnError, | |
492 weak_ptr_factory_.GetWeakPtr(), | |
493 Delegate::ERROR_WRITE)); | |
494 } | |
495 } | |
496 | |
497 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read, | |
498 std::vector<char>* buffer) { | |
499 read_lock_.AssertAcquired(); | |
500 read_buffer_->num_valid_bytes_ += additional_bytes_read; | |
501 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_); | |
502 read_buffer_->buffer_.swap(*buffer); | |
503 read_buffer_->num_valid_bytes_ = 0; | |
504 } | |
505 | |
506 void RawChannel::SerializeWriteBuffer( | |
507 size_t additional_bytes_written, | |
508 size_t additional_platform_handles_written, | |
509 std::vector<char>* buffer, | |
510 std::vector<int>* fds) { | |
511 write_lock_.AssertAcquired(); | |
512 if (write_buffer_->IsEmpty()) { | |
513 DCHECK_EQ(0u, additional_bytes_written); | |
514 DCHECK_EQ(0u, additional_platform_handles_written); | |
515 return; | |
516 } | |
517 | |
518 UpdateWriteBuffer( | |
519 additional_platform_handles_written, additional_bytes_written); | |
520 while (!write_buffer_->message_queue_.IsEmpty()) { | |
521 SerializePlatformHandles(fds); | |
522 std::vector<WriteBuffer::Buffer> buffers; | |
523 write_buffer_no_lock()->GetBuffers(&buffers); | |
524 for (size_t i = 0; i < buffers.size(); ++i) { | |
525 buffer->insert(buffer->end(), buffers[i].addr, | |
526 buffers[i].addr + buffers[i].size); | |
527 } | |
528 write_buffer_->message_queue_.DiscardMessage(); | |
529 } | |
530 } | |
531 | |
532 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | |
533 write_lock_.AssertAcquired(); | |
534 write_buffer_->message_queue_.AddMessage(std::move(message)); | |
535 } | |
536 | |
537 bool RawChannel::OnReadMessageForRawChannel( | |
538 const MessageInTransit::View& message_view) { | |
539 LOG(ERROR) << "Invalid control message (type " << message_view.type() | |
540 << ")"; | |
541 return false; | |
542 } | |
543 | |
544 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( | |
545 IOResult io_result) { | |
546 switch (io_result) { | |
547 case IO_FAILED_SHUTDOWN: | |
548 return Delegate::ERROR_READ_SHUTDOWN; | |
549 case IO_FAILED_BROKEN: | |
550 return Delegate::ERROR_READ_BROKEN; | |
551 case IO_FAILED_UNKNOWN: | |
552 return Delegate::ERROR_READ_UNKNOWN; | |
553 case IO_SUCCEEDED: | |
554 case IO_PENDING: | |
555 NOTREACHED(); | |
556 break; | |
557 } | |
558 return Delegate::ERROR_READ_UNKNOWN; | |
559 } | |
560 | |
561 void RawChannel::CallOnError(Delegate::Error error) { | |
562 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); | |
563 read_lock_.AssertAcquired(); | |
564 error_occurred_ = true; | |
565 if (delegate_) { | |
566 DCHECK(!calling_delegate_); | |
567 calling_delegate_ = true; | |
568 delegate_->OnError(error); | |
569 calling_delegate_ = false; | |
570 } else { | |
571 // We depend on delegate to delete since it could be waiting to call | |
572 // ReleaseHandle. | |
573 internal::g_io_thread_task_runner->PostTask( | |
574 FROM_HERE, | |
575 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
576 } | |
577 } | |
578 | |
579 void RawChannel::LockAndCallOnError(Delegate::Error error) { | |
580 base::AutoLock locker(read_lock_); | |
581 CallOnError(error); | |
582 } | |
583 | |
584 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result, | |
585 size_t platform_handles_written, | |
586 size_t bytes_written) { | |
587 write_lock_.AssertAcquired(); | |
588 | |
589 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | |
590 | |
591 if (io_result == IO_SUCCEEDED) { | |
592 UpdateWriteBuffer(platform_handles_written, bytes_written); | |
593 if (write_buffer_->message_queue_.IsEmpty()) { | |
594 if (!delegate_) { | |
595 // Shutdown must have been called and we were waiting to flush all | |
596 // pending writes. Now we're done. | |
597 internal::g_io_thread_task_runner->PostTask( | |
598 FROM_HERE, | |
599 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
600 } | |
601 return true; | |
602 } | |
603 | |
604 // Schedule the next write. | |
605 io_result = ScheduleWriteNoLock(); | |
606 if (io_result == IO_PENDING) | |
607 return true; | |
608 DCHECK_NE(io_result, IO_SUCCEEDED); | |
609 } | |
610 | |
611 write_stopped_ = true; | |
612 write_buffer_->message_queue_.Clear(); | |
613 write_buffer_->platform_handles_offset_ = 0; | |
614 write_buffer_->data_offset_ = 0; | |
615 return false; | |
616 } | |
617 | |
618 void RawChannel::DispatchMessages(bool* did_dispatch_message, | |
619 bool* stop_dispatching) { | |
620 *did_dispatch_message = false; | |
621 *stop_dispatching = false; | |
622 // Tracks the offset of the first undispatched message in |read_buffer_|. | |
623 // Currently, we copy data to ensure that this is zero at the beginning. | |
624 size_t read_buffer_start = 0; | |
625 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | |
626 size_t message_size; | |
627 // Note that we rely on short-circuit evaluation here: | |
628 // - |read_buffer_start| may be an invalid index into | |
629 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
630 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
631 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
632 // next read). | |
633 // TODO(vtl): Validate that |message_size| is sane. | |
634 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
635 &read_buffer_->buffer_[read_buffer_start], | |
636 remaining_bytes, &message_size) && | |
637 remaining_bytes >= message_size) { | |
638 MessageInTransit::View message_view( | |
639 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
640 DCHECK_EQ(message_view.total_size(), message_size); | |
641 | |
642 const char* error_message = nullptr; | |
643 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
644 &error_message)) { | |
645 DCHECK(error_message); | |
646 LOG(ERROR) << "Received invalid message: " << error_message; | |
647 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
648 *stop_dispatching = true; | |
649 return; // |this| may have been destroyed in |CallOnError()|. | |
650 } | |
651 | |
652 if (message_view.type() != MessageInTransit::Type::MESSAGE && | |
653 message_view.type() != MessageInTransit::Type::QUIT_MESSAGE) { | |
654 if (!OnReadMessageForRawChannel(message_view)) { | |
655 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
656 *stop_dispatching = true; | |
657 return; // |this| may have been destroyed in |CallOnError()|. | |
658 } | |
659 } else { | |
660 ScopedPlatformHandleVectorPtr platform_handles; | |
661 if (message_view.transport_data_buffer()) { | |
662 size_t num_platform_handles; | |
663 const void* platform_handle_table; | |
664 TransportData::GetPlatformHandleTable( | |
665 message_view.transport_data_buffer(), &num_platform_handles, | |
666 &platform_handle_table); | |
667 | |
668 if (num_platform_handles > 0) { | |
669 platform_handles = GetReadPlatformHandles(num_platform_handles, | |
670 platform_handle_table); | |
671 if (!platform_handles) { | |
672 LOG(ERROR) << "Invalid number of platform handles received"; | |
673 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
674 *stop_dispatching = true; | |
675 return; // |this| may have been destroyed in |CallOnError()|. | |
676 } | |
677 } | |
678 } | |
679 | |
680 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
681 // for the POSIX implementation, we should confirm that none are stored. | |
682 if (delegate_) { | |
683 DCHECK(!calling_delegate_); | |
684 calling_delegate_ = true; | |
685 delegate_->OnReadMessage(message_view, std::move(platform_handles)); | |
686 calling_delegate_ = false; | |
687 } | |
688 } | |
689 | |
690 *did_dispatch_message = true; | |
691 | |
692 // Update our state. | |
693 read_buffer_start += message_size; | |
694 remaining_bytes -= message_size; | |
695 } | |
696 | |
697 if (read_buffer_start > 0) { | |
698 // Move data back to start. | |
699 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
700 if (read_buffer_->num_valid_bytes_ > 0) { | |
701 memmove(&read_buffer_->buffer_[0], | |
702 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
703 } | |
704 read_buffer_start = 0; | |
705 } | |
706 } | |
707 | |
708 void RawChannel::UpdateWriteBuffer(size_t platform_handles_written, | |
709 size_t bytes_written) { | |
710 write_buffer_->platform_handles_offset_ += platform_handles_written; | |
711 write_buffer_->data_offset_ += bytes_written; | |
712 | |
713 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); | |
714 if (write_buffer_->data_offset_ >= message->total_size()) { | |
715 // Complete write. | |
716 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | |
717 write_buffer_->message_queue_.DiscardMessage(); | |
718 write_buffer_->platform_handles_offset_ = 0; | |
719 write_buffer_->data_offset_ = 0; | |
720 } | |
721 } | |
722 | |
723 void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) { | |
724 base::AutoLock locker(read_lock_); | |
725 OnReadCompletedNoLock(io_result, bytes_read); | |
726 } | |
727 | |
728 void RawChannel::WillDestroyCurrentMessageLoop() { | |
729 { | |
730 base::AutoLock locker(read_lock_); | |
731 OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0); | |
732 } | |
733 // The PostTask inside Shutdown() will never be called, so manually call it | |
734 // here to avoid leaks in LSAN builds. | |
735 Shutdown(); | |
736 } | |
737 | |
738 } // namespace edk | |
739 } // namespace mojo | |
OLD | NEW |