Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(449)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1649633002: Remove files that are no longer used in the Port EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9 #include <string.h>
10 #include <algorithm>
11 #include <utility>
12
13 #include "base/bind.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "mojo/edk/embedder/embedder_internal.h"
18 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/message_in_transit.h"
20 #include "mojo/edk/system/transport_data.h"
21
22 namespace mojo {
23 namespace edk {
24
25 const size_t kReadSize = 4096;
26
27 // RawChannel::ReadBuffer ------------------------------------------------------
28
29 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
30 }
31
32 RawChannel::ReadBuffer::~ReadBuffer() {
33 }
34
35 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
36 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
37 *addr = &buffer_[0] + num_valid_bytes_;
38 *size = kReadSize;
39 }
40
41 // RawChannel::WriteBuffer -----------------------------------------------------
42
43 RawChannel::WriteBuffer::WriteBuffer()
44 : serialized_platform_handle_size_(0),
45 platform_handles_offset_(0),
46 data_offset_(0) {
47 }
48
49 RawChannel::WriteBuffer::~WriteBuffer() {
50 message_queue_.Clear();
51 }
52
53 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
54 if (message_queue_.IsEmpty())
55 return false;
56
57 const TransportData* transport_data =
58 message_queue_.PeekMessage()->transport_data();
59 if (!transport_data)
60 return false;
61
62 const PlatformHandleVector* all_platform_handles =
63 transport_data->platform_handles();
64 if (!all_platform_handles) {
65 DCHECK_EQ(platform_handles_offset_, 0u);
66 return false;
67 }
68 if (platform_handles_offset_ >= all_platform_handles->size()) {
69 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
70 return false;
71 }
72
73 return true;
74 }
75
76 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
77 size_t* num_platform_handles,
78 PlatformHandle** platform_handles,
79 void** serialization_data) {
80 DCHECK(HavePlatformHandlesToSend());
81
82 MessageInTransit* message = message_queue_.PeekMessage();
83 TransportData* transport_data = message->transport_data();
84 PlatformHandleVector* all_platform_handles =
85 transport_data->platform_handles();
86 *num_platform_handles =
87 all_platform_handles->size() - platform_handles_offset_;
88 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
89
90 if (serialized_platform_handle_size_ > 0) {
91 size_t serialization_data_offset =
92 transport_data->platform_handle_table_offset();
93 serialization_data_offset +=
94 platform_handles_offset_ * serialized_platform_handle_size_;
95 *serialization_data = static_cast<char*>(transport_data->buffer()) +
96 serialization_data_offset;
97 } else {
98 *serialization_data = nullptr;
99 }
100 }
101
102 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) {
103 buffers->clear();
104
105 if (message_queue_.IsEmpty())
106 return;
107
108 const MessageInTransit* message = message_queue_.PeekMessage();
109 if (message->type() == MessageInTransit::Type::RAW_MESSAGE) {
110 // These are already-serialized messages so we don't want to write another
111 // header as they include that.
112 if (data_offset_ == 0) {
113 size_t header_size = message->total_size() - message->num_bytes();
114 data_offset_ = header_size;
115 }
116 }
117
118 DCHECK_LT(data_offset_, message->total_size());
119 size_t bytes_to_write = message->total_size() - data_offset_;
120
121 size_t transport_data_buffer_size =
122 message->transport_data() ? message->transport_data()->buffer_size() : 0;
123
124 if (!transport_data_buffer_size) {
125 // Only write from the main buffer.
126 DCHECK_LT(data_offset_, message->main_buffer_size());
127 DCHECK_LE(bytes_to_write, message->main_buffer_size());
128 Buffer buffer = {
129 static_cast<const char*>(message->main_buffer()) + data_offset_,
130 bytes_to_write};
131
132 buffers->push_back(buffer);
133 return;
134 }
135
136 if (data_offset_ >= message->main_buffer_size()) {
137 // Only write from the transport data buffer.
138 DCHECK_LT(data_offset_ - message->main_buffer_size(),
139 transport_data_buffer_size);
140 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
141 Buffer buffer = {
142 static_cast<const char*>(message->transport_data()->buffer()) +
143 (data_offset_ - message->main_buffer_size()),
144 bytes_to_write};
145
146 buffers->push_back(buffer);
147 return;
148 }
149
150 // TODO(vtl): We could actually send out buffers from multiple messages, with
151 // the "stopping" condition being reaching a message with platform handles
152 // attached.
153
154 // Write from both buffers.
155 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
156 transport_data_buffer_size);
157 Buffer buffer1 = {
158 static_cast<const char*>(message->main_buffer()) + data_offset_,
159 message->main_buffer_size() - data_offset_};
160 buffers->push_back(buffer1);
161 Buffer buffer2 = {
162 static_cast<const char*>(message->transport_data()->buffer()),
163 transport_data_buffer_size};
164 buffers->push_back(buffer2);
165 }
166
167 // RawChannel ------------------------------------------------------------------
168
169 RawChannel::RawChannel()
170 : delegate_(nullptr),
171 error_occurred_(false),
172 calling_delegate_(false),
173 write_ready_(false),
174 write_stopped_(false),
175 pending_write_error_(false),
176 initialized_(false),
177 weak_ptr_factory_(this) {
178 read_buffer_.reset(new ReadBuffer);
179 write_buffer_.reset(new WriteBuffer());
180 }
181
182 RawChannel::~RawChannel() {
183 DCHECK(!read_buffer_);
184 DCHECK(!write_buffer_);
185 }
186
187 void RawChannel::Init(Delegate* delegate) {
188 DCHECK(delegate);
189
190 base::AutoLock read_locker(read_lock_);
191 // Solves race where initialiing on io thread while main thread is serializing
192 // this channel and releases handle.
193 base::AutoLock locker(write_lock_);
194
195 DCHECK(!delegate_);
196 delegate_ = delegate;
197
198 if (read_buffer_->num_valid_bytes_ ||
199 !write_buffer_->message_queue_.IsEmpty()) {
200 LazyInitialize();
201 }
202 }
203
204 void RawChannel::EnsureLazyInitialized() {
205 {
206 base::AutoLock locker(write_lock_);
207 if (initialized_)
208 return;
209 }
210
211 internal::g_io_thread_task_runner->PostTask(
212 FROM_HERE,
213 base::Bind(&RawChannel::LockAndCallLazyInitialize,
214 weak_ptr_factory_.GetWeakPtr()));
215 }
216
217 void RawChannel::LockAndCallLazyInitialize() {
218 base::AutoLock read_locker(read_lock_);
219 base::AutoLock locker(write_lock_);
220 LazyInitialize();
221 }
222
223 void RawChannel::LazyInitialize() {
224 read_lock_.AssertAcquired();
225 write_lock_.AssertAcquired();
226 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
227 if (initialized_)
228 return;
229 initialized_ = true;
230 base::MessageLoop::current()->AddDestructionObserver(this);
231
232 OnInit();
233
234 if (read_buffer_->num_valid_bytes_) {
235 // We had serialized read buffer data through SetSerializedData call.
236 // Make sure we read messages out of it now, otherwise the delegate won't
237 // get notified if no other data gets written to the pipe.
238 // Although this means that we can call back synchronously into the caller,
239 // that's easier than posting a task to do this. That is because if we post
240 // a task, a pending read could have started and we wouldn't be able to move
241 // the read buffer since it can be in use by the OS in an async operation.
242 bool did_dispatch_message = false;
243 bool stop_dispatching = false;
244 DispatchMessages(&did_dispatch_message, &stop_dispatching);
245 }
246
247 IOResult io_result = ScheduleRead();
248 if (io_result != IO_PENDING) {
249 // This will notify the delegate about the read failure. Although we're on
250 // the I/O thread, don't call it in the nested context.
251 internal::g_io_thread_task_runner->PostTask(
252 FROM_HERE, base::Bind(&RawChannel::CallOnReadCompleted,
253 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
254 }
255 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
256 // the delegate), not an initialization failure.
257
258 write_ready_ = true;
259 write_buffer_->serialized_platform_handle_size_ =
260 GetSerializedPlatformHandleSize();
261 if (!write_buffer_->message_queue_.IsEmpty())
262 SendQueuedMessagesNoLock();
263 }
264
265 void RawChannel::Shutdown() {
266 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
267
268 weak_ptr_factory_.InvalidateWeakPtrs();
269 // Reset the delegate so that it won't receive further calls.
270 delegate_ = nullptr;
271 if (calling_delegate_) {
272 internal::g_io_thread_task_runner->PostTask(
273 FROM_HERE,
274 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
275 return;
276 }
277
278 bool empty = false;
279 {
280 base::AutoLock locker(write_lock_);
281 empty = write_buffer_->message_queue_.IsEmpty();
282 }
283
284 // Normally, we want to flush any pending writes before shutting down. This
285 // doesn't apply when 1) we don't have a handle (for obvious reasons),
286 // 2) we have a read or write error before (doesn't matter which), or 3) when
287 // there are no pending messages to be written.
288 if (!IsHandleValid() || error_occurred_ || empty) {
289 {
290 base::AutoLock read_locker(read_lock_);
291 base::AutoLock locker(write_lock_);
292 OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_));
293 if (initialized_)
294 base::MessageLoop::current()->RemoveDestructionObserver(this);
295 }
296
297 delete this;
298 return;
299 }
300
301 base::AutoLock read_locker(read_lock_);
302 base::AutoLock locker(write_lock_);
303 DCHECK(read_buffer_->IsEmpty()) <<
304 "RawChannel::Shutdown called but there is pending data to be read";
305
306 write_stopped_ = true;
307 }
308
309 ScopedPlatformHandle RawChannel::ReleaseHandle(
310 std::vector<char>* serialized_read_buffer,
311 std::vector<char>* serialized_write_buffer,
312 std::vector<int>* serialized_read_fds,
313 std::vector<int>* serialized_write_fds,
314 bool* write_error) {
315 ScopedPlatformHandle rv;
316 *write_error = false;
317 {
318 base::AutoLock read_locker(read_lock_);
319 base::AutoLock locker(write_lock_);
320 rv = ReleaseHandleNoLock(serialized_read_buffer,
321 serialized_write_buffer,
322 serialized_read_fds,
323 serialized_write_fds,
324 write_error);
325 delegate_ = nullptr;
326 internal::g_io_thread_task_runner->PostTask(
327 FROM_HERE,
328 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
329 }
330
331 return rv;
332 }
333
334 // Reminder: This must be thread-safe.
335 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
336 DCHECK(message);
337 EnsureLazyInitialized();
338 base::AutoLock locker(write_lock_);
339 if (write_stopped_)
340 return false;
341
342 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
343 EnqueueMessageNoLock(std::move(message));
344 if (queue_was_empty && write_ready_)
345 return SendQueuedMessagesNoLock();
346
347 return true;
348 }
349
350 bool RawChannel::SendQueuedMessagesNoLock() {
351 DCHECK_EQ(write_buffer_->data_offset_, 0u);
352
353 size_t platform_handles_written = 0;
354 size_t bytes_written = 0;
355 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
356 if (io_result == IO_PENDING)
357 return true;
358
359 bool result = OnWriteCompletedInternalNoLock(
360 io_result, platform_handles_written, bytes_written);
361 if (!result) {
362 // Even if we're on the I/O thread, don't call |OnError()| in the nested
363 // context.
364 pending_write_error_ = true;
365 internal::g_io_thread_task_runner->PostTask(
366 FROM_HERE,
367 base::Bind(&RawChannel::LockAndCallOnError,
368 weak_ptr_factory_.GetWeakPtr(),
369 Delegate::ERROR_WRITE));
370 }
371
372 return result;
373 }
374
375 void RawChannel::SetSerializedData(
376 char* serialized_read_buffer, size_t serialized_read_buffer_size,
377 char* serialized_write_buffer, size_t serialized_write_buffer_size,
378 std::vector<int>* serialized_read_fds,
379 std::vector<int>* serialized_write_fds) {
380 base::AutoLock locker(read_lock_);
381
382 #if defined(OS_POSIX)
383 SetSerializedFDs(serialized_read_fds, serialized_write_fds);
384 #endif
385
386 if (serialized_read_buffer_size) {
387 // TODO(jam): copy power of 2 algorithm below? or share.
388 read_buffer_->buffer_.resize(serialized_read_buffer_size + kReadSize);
389 memcpy(&read_buffer_->buffer_[0], serialized_read_buffer,
390 serialized_read_buffer_size);
391 read_buffer_->num_valid_bytes_ = serialized_read_buffer_size;
392 }
393
394 if (serialized_write_buffer_size) {
395 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
396
397 uint32_t offset = 0;
398 while (offset < serialized_write_buffer_size) {
399 uint32_t message_num_bytes =
400 std::min(static_cast<uint32_t>(max_message_num_bytes),
401 static_cast<uint32_t>(serialized_write_buffer_size) -
402 offset);
403 scoped_ptr<MessageInTransit> message(new MessageInTransit(
404 MessageInTransit::Type::RAW_MESSAGE, message_num_bytes,
405 static_cast<const char*>(serialized_write_buffer) + offset));
406 write_buffer_->message_queue_.AddMessage(std::move(message));
407 offset += message_num_bytes;
408 }
409 }
410 }
411
412 void RawChannel::OnReadCompletedNoLock(IOResult io_result, size_t bytes_read) {
413 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
414 read_lock_.AssertAcquired();
415 // Keep reading data in a loop, and dispatch messages if enough data is
416 // received. Exit the loop if any of the following happens:
417 // - one or more messages were dispatched;
418 // - the last read failed, was a partial read or would block;
419 // - |Shutdown()| was called.
420 do {
421 switch (io_result) {
422 case IO_SUCCEEDED:
423 break;
424 case IO_FAILED_SHUTDOWN:
425 case IO_FAILED_BROKEN:
426 case IO_FAILED_UNKNOWN:
427 CallOnError(ReadIOResultToError(io_result));
428 return; // |this| may have been destroyed in |CallOnError()|.
429 case IO_PENDING:
430 NOTREACHED();
431 return;
432 }
433
434 read_buffer_->num_valid_bytes_ += bytes_read;
435
436 // Dispatch all the messages that we can.
437 bool did_dispatch_message = false;
438 bool stop_dispatching = false;
439 DispatchMessages(&did_dispatch_message, &stop_dispatching);
440 if (stop_dispatching)
441 return;
442
443 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
444 kReadSize) {
445 // Use power-of-2 buffer sizes.
446 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
447 // maximum message size to whatever extent necessary).
448 // TODO(vtl): We may often be able to peek at the header and get the real
449 // required extra space (which may be much bigger than |kReadSize|).
450 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
451 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
452 new_size *= 2;
453
454 // TODO(vtl): It's suboptimal to zero out the fresh memory.
455 read_buffer_->buffer_.resize(new_size, 0);
456 }
457
458 // (1) If we dispatched any messages, stop reading for now (and let the
459 // message loop do its thing for another round).
460 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
461 // a single message. Risks: slower, more complex if we want to avoid lots of
462 // copying. ii. Keep reading until there's no more data and dispatch all the
463 // messages we can. Risks: starvation of other users of the message loop.)
464 // (2) If we didn't max out |kReadSize|, stop reading for now.
465 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
466 bytes_read = 0;
467 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
468 } while (io_result != IO_PENDING);
469 }
470
471 void RawChannel::OnWriteCompletedNoLock(IOResult io_result,
472 size_t platform_handles_written,
473 size_t bytes_written) {
474 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
475 write_lock_.AssertAcquired();
476 DCHECK_NE(io_result, IO_PENDING);
477
478 bool did_fail = !OnWriteCompletedInternalNoLock(
479 io_result, platform_handles_written, bytes_written);
480 if (did_fail) {
481 // Don't want to call the delegate with the current callstack for two
482 // reasons:
483 // 1) We already have write_lock_ acquired, and calling the delegate means
484 // also acquiring the read lock. We need to acquire read and then write to
485 // avoid deadlocks.
486 // 2) We shouldn't call the delegate with write_lock acquired, since the
487 // delegate could be calling WriteMessage and that can cause deadlocks.
488 pending_write_error_ = true;
489 internal::g_io_thread_task_runner->PostTask(
490 FROM_HERE,
491 base::Bind(&RawChannel::LockAndCallOnError,
492 weak_ptr_factory_.GetWeakPtr(),
493 Delegate::ERROR_WRITE));
494 }
495 }
496
497 void RawChannel::SerializeReadBuffer(size_t additional_bytes_read,
498 std::vector<char>* buffer) {
499 read_lock_.AssertAcquired();
500 read_buffer_->num_valid_bytes_ += additional_bytes_read;
501 read_buffer_->buffer_.resize(read_buffer_->num_valid_bytes_);
502 read_buffer_->buffer_.swap(*buffer);
503 read_buffer_->num_valid_bytes_ = 0;
504 }
505
506 void RawChannel::SerializeWriteBuffer(
507 size_t additional_bytes_written,
508 size_t additional_platform_handles_written,
509 std::vector<char>* buffer,
510 std::vector<int>* fds) {
511 write_lock_.AssertAcquired();
512 if (write_buffer_->IsEmpty()) {
513 DCHECK_EQ(0u, additional_bytes_written);
514 DCHECK_EQ(0u, additional_platform_handles_written);
515 return;
516 }
517
518 UpdateWriteBuffer(
519 additional_platform_handles_written, additional_bytes_written);
520 while (!write_buffer_->message_queue_.IsEmpty()) {
521 SerializePlatformHandles(fds);
522 std::vector<WriteBuffer::Buffer> buffers;
523 write_buffer_no_lock()->GetBuffers(&buffers);
524 for (size_t i = 0; i < buffers.size(); ++i) {
525 buffer->insert(buffer->end(), buffers[i].addr,
526 buffers[i].addr + buffers[i].size);
527 }
528 write_buffer_->message_queue_.DiscardMessage();
529 }
530 }
531
532 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
533 write_lock_.AssertAcquired();
534 write_buffer_->message_queue_.AddMessage(std::move(message));
535 }
536
537 bool RawChannel::OnReadMessageForRawChannel(
538 const MessageInTransit::View& message_view) {
539 LOG(ERROR) << "Invalid control message (type " << message_view.type()
540 << ")";
541 return false;
542 }
543
544 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
545 IOResult io_result) {
546 switch (io_result) {
547 case IO_FAILED_SHUTDOWN:
548 return Delegate::ERROR_READ_SHUTDOWN;
549 case IO_FAILED_BROKEN:
550 return Delegate::ERROR_READ_BROKEN;
551 case IO_FAILED_UNKNOWN:
552 return Delegate::ERROR_READ_UNKNOWN;
553 case IO_SUCCEEDED:
554 case IO_PENDING:
555 NOTREACHED();
556 break;
557 }
558 return Delegate::ERROR_READ_UNKNOWN;
559 }
560
561 void RawChannel::CallOnError(Delegate::Error error) {
562 DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
563 read_lock_.AssertAcquired();
564 error_occurred_ = true;
565 if (delegate_) {
566 DCHECK(!calling_delegate_);
567 calling_delegate_ = true;
568 delegate_->OnError(error);
569 calling_delegate_ = false;
570 } else {
571 // We depend on delegate to delete since it could be waiting to call
572 // ReleaseHandle.
573 internal::g_io_thread_task_runner->PostTask(
574 FROM_HERE,
575 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
576 }
577 }
578
579 void RawChannel::LockAndCallOnError(Delegate::Error error) {
580 base::AutoLock locker(read_lock_);
581 CallOnError(error);
582 }
583
584 bool RawChannel::OnWriteCompletedInternalNoLock(IOResult io_result,
585 size_t platform_handles_written,
586 size_t bytes_written) {
587 write_lock_.AssertAcquired();
588
589 DCHECK(!write_buffer_->message_queue_.IsEmpty());
590
591 if (io_result == IO_SUCCEEDED) {
592 UpdateWriteBuffer(platform_handles_written, bytes_written);
593 if (write_buffer_->message_queue_.IsEmpty()) {
594 if (!delegate_) {
595 // Shutdown must have been called and we were waiting to flush all
596 // pending writes. Now we're done.
597 internal::g_io_thread_task_runner->PostTask(
598 FROM_HERE,
599 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
600 }
601 return true;
602 }
603
604 // Schedule the next write.
605 io_result = ScheduleWriteNoLock();
606 if (io_result == IO_PENDING)
607 return true;
608 DCHECK_NE(io_result, IO_SUCCEEDED);
609 }
610
611 write_stopped_ = true;
612 write_buffer_->message_queue_.Clear();
613 write_buffer_->platform_handles_offset_ = 0;
614 write_buffer_->data_offset_ = 0;
615 return false;
616 }
617
618 void RawChannel::DispatchMessages(bool* did_dispatch_message,
619 bool* stop_dispatching) {
620 *did_dispatch_message = false;
621 *stop_dispatching = false;
622 // Tracks the offset of the first undispatched message in |read_buffer_|.
623 // Currently, we copy data to ensure that this is zero at the beginning.
624 size_t read_buffer_start = 0;
625 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
626 size_t message_size;
627 // Note that we rely on short-circuit evaluation here:
628 // - |read_buffer_start| may be an invalid index into
629 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
630 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
631 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
632 // next read).
633 // TODO(vtl): Validate that |message_size| is sane.
634 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
635 &read_buffer_->buffer_[read_buffer_start],
636 remaining_bytes, &message_size) &&
637 remaining_bytes >= message_size) {
638 MessageInTransit::View message_view(
639 message_size, &read_buffer_->buffer_[read_buffer_start]);
640 DCHECK_EQ(message_view.total_size(), message_size);
641
642 const char* error_message = nullptr;
643 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
644 &error_message)) {
645 DCHECK(error_message);
646 LOG(ERROR) << "Received invalid message: " << error_message;
647 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
648 *stop_dispatching = true;
649 return; // |this| may have been destroyed in |CallOnError()|.
650 }
651
652 if (message_view.type() != MessageInTransit::Type::MESSAGE &&
653 message_view.type() != MessageInTransit::Type::QUIT_MESSAGE) {
654 if (!OnReadMessageForRawChannel(message_view)) {
655 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
656 *stop_dispatching = true;
657 return; // |this| may have been destroyed in |CallOnError()|.
658 }
659 } else {
660 ScopedPlatformHandleVectorPtr platform_handles;
661 if (message_view.transport_data_buffer()) {
662 size_t num_platform_handles;
663 const void* platform_handle_table;
664 TransportData::GetPlatformHandleTable(
665 message_view.transport_data_buffer(), &num_platform_handles,
666 &platform_handle_table);
667
668 if (num_platform_handles > 0) {
669 platform_handles = GetReadPlatformHandles(num_platform_handles,
670 platform_handle_table);
671 if (!platform_handles) {
672 LOG(ERROR) << "Invalid number of platform handles received";
673 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
674 *stop_dispatching = true;
675 return; // |this| may have been destroyed in |CallOnError()|.
676 }
677 }
678 }
679
680 // TODO(vtl): In the case that we aren't expecting any platform handles,
681 // for the POSIX implementation, we should confirm that none are stored.
682 if (delegate_) {
683 DCHECK(!calling_delegate_);
684 calling_delegate_ = true;
685 delegate_->OnReadMessage(message_view, std::move(platform_handles));
686 calling_delegate_ = false;
687 }
688 }
689
690 *did_dispatch_message = true;
691
692 // Update our state.
693 read_buffer_start += message_size;
694 remaining_bytes -= message_size;
695 }
696
697 if (read_buffer_start > 0) {
698 // Move data back to start.
699 read_buffer_->num_valid_bytes_ = remaining_bytes;
700 if (read_buffer_->num_valid_bytes_ > 0) {
701 memmove(&read_buffer_->buffer_[0],
702 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
703 }
704 read_buffer_start = 0;
705 }
706 }
707
708 void RawChannel::UpdateWriteBuffer(size_t platform_handles_written,
709 size_t bytes_written) {
710 write_buffer_->platform_handles_offset_ += platform_handles_written;
711 write_buffer_->data_offset_ += bytes_written;
712
713 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
714 if (write_buffer_->data_offset_ >= message->total_size()) {
715 // Complete write.
716 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
717 write_buffer_->message_queue_.DiscardMessage();
718 write_buffer_->platform_handles_offset_ = 0;
719 write_buffer_->data_offset_ = 0;
720 }
721 }
722
723 void RawChannel::CallOnReadCompleted(IOResult io_result, size_t bytes_read) {
724 base::AutoLock locker(read_lock_);
725 OnReadCompletedNoLock(io_result, bytes_read);
726 }
727
728 void RawChannel::WillDestroyCurrentMessageLoop() {
729 {
730 base::AutoLock locker(read_lock_);
731 OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0);
732 }
733 // The PostTask inside Shutdown() will never be called, so manually call it
734 // here to avoid leaks in LSAN builds.
735 Shutdown();
736 }
737
738 } // namespace edk
739 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698