Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/edk/system/message_in_transit.h"
17 #include "mojo/edk/system/transport_data.h"
18
19 namespace mojo {
20 namespace system {
21
22 const size_t kReadSize = 4096;
23
24 // RawChannel::ReadBuffer ------------------------------------------------------
25
26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
27 }
28
29 RawChannel::ReadBuffer::~ReadBuffer() {
30 }
31
32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
34 *addr = &buffer_[0] + num_valid_bytes_;
35 *size = kReadSize;
36 }
37
38 // RawChannel::WriteBuffer -----------------------------------------------------
39
40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
41 : serialized_platform_handle_size_(serialized_platform_handle_size),
42 platform_handles_offset_(0),
43 data_offset_(0) {
44 }
45
46 RawChannel::WriteBuffer::~WriteBuffer() {
47 STLDeleteElements(&message_queue_);
48 }
49
50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
51 if (message_queue_.empty())
52 return false;
53
54 const TransportData* transport_data =
55 message_queue_.front()->transport_data();
56 if (!transport_data)
57 return false;
58
59 const embedder::PlatformHandleVector* all_platform_handles =
60 transport_data->platform_handles();
61 if (!all_platform_handles) {
62 DCHECK_EQ(platform_handles_offset_, 0u);
63 return false;
64 }
65 if (platform_handles_offset_ >= all_platform_handles->size()) {
66 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
67 return false;
68 }
69
70 return true;
71 }
72
73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
74 size_t* num_platform_handles,
75 embedder::PlatformHandle** platform_handles,
76 void** serialization_data) {
77 DCHECK(HavePlatformHandlesToSend());
78
79 MessageInTransit* message = message_queue_.front();
80 TransportData* transport_data = message->transport_data();
81 embedder::PlatformHandleVector* all_platform_handles =
82 transport_data->platform_handles();
83 *num_platform_handles =
84 all_platform_handles->size() - platform_handles_offset_;
85 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
86
87 if (serialized_platform_handle_size_ > 0) {
88 size_t serialization_data_offset =
89 transport_data->platform_handle_table_offset();
90 DCHECK_GT(serialization_data_offset, 0u);
91 serialization_data_offset +=
92 platform_handles_offset_ * serialized_platform_handle_size_;
93 *serialization_data = static_cast<char*>(transport_data->buffer()) +
94 serialization_data_offset;
95 } else {
96 *serialization_data = nullptr;
97 }
98 }
99
100 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
101 buffers->clear();
102
103 if (message_queue_.empty())
104 return;
105
106 MessageInTransit* message = message_queue_.front();
107 DCHECK_LT(data_offset_, message->total_size());
108 size_t bytes_to_write = message->total_size() - data_offset_;
109
110 size_t transport_data_buffer_size =
111 message->transport_data() ? message->transport_data()->buffer_size() : 0;
112
113 if (!transport_data_buffer_size) {
114 // Only write from the main buffer.
115 DCHECK_LT(data_offset_, message->main_buffer_size());
116 DCHECK_LE(bytes_to_write, message->main_buffer_size());
117 Buffer buffer = {
118 static_cast<const char*>(message->main_buffer()) + data_offset_,
119 bytes_to_write};
120 buffers->push_back(buffer);
121 return;
122 }
123
124 if (data_offset_ >= message->main_buffer_size()) {
125 // Only write from the transport data buffer.
126 DCHECK_LT(data_offset_ - message->main_buffer_size(),
127 transport_data_buffer_size);
128 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
129 Buffer buffer = {
130 static_cast<const char*>(message->transport_data()->buffer()) +
131 (data_offset_ - message->main_buffer_size()),
132 bytes_to_write};
133 buffers->push_back(buffer);
134 return;
135 }
136
137 // TODO(vtl): We could actually send out buffers from multiple messages, with
138 // the "stopping" condition being reaching a message with platform handles
139 // attached.
140
141 // Write from both buffers.
142 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
143 transport_data_buffer_size);
144 Buffer buffer1 = {
145 static_cast<const char*>(message->main_buffer()) + data_offset_,
146 message->main_buffer_size() - data_offset_};
147 buffers->push_back(buffer1);
148 Buffer buffer2 = {
149 static_cast<const char*>(message->transport_data()->buffer()),
150 transport_data_buffer_size};
151 buffers->push_back(buffer2);
152 }
153
154 // RawChannel ------------------------------------------------------------------
155
156 RawChannel::RawChannel()
157 : message_loop_for_io_(nullptr),
158 delegate_(nullptr),
159 read_stopped_(false),
160 write_stopped_(false),
161 weak_ptr_factory_(this) {
162 }
163
164 RawChannel::~RawChannel() {
165 DCHECK(!read_buffer_);
166 DCHECK(!write_buffer_);
167
168 // No need to take the |write_lock_| here -- if there are still weak pointers
169 // outstanding, then we're hosed anyway (since we wouldn't be able to
170 // invalidate them cleanly, since we might not be on the I/O thread).
171 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
172 }
173
174 void RawChannel::Init(Delegate* delegate) {
175 DCHECK(delegate);
176
177 DCHECK(!delegate_);
178 delegate_ = delegate;
179
180 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
181 DCHECK(!message_loop_for_io_);
182 message_loop_for_io_ =
183 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
184
185 // No need to take the lock. No one should be using us yet.
186 DCHECK(!read_buffer_);
187 read_buffer_.reset(new ReadBuffer);
188 DCHECK(!write_buffer_);
189 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
190
191 OnInit();
192
193 IOResult io_result = ScheduleRead();
194 if (io_result != IO_PENDING) {
195 // This will notify the delegate about the read failure. Although we're on
196 // the I/O thread, don't call it in the nested context.
197 message_loop_for_io_->PostTask(
198 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
199 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
200 }
201 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
202 // the delegate), not an initialization failure.
203 }
204
205 void RawChannel::Shutdown() {
206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
207
208 base::AutoLock locker(write_lock_);
209
210 LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
211 << "Shutting down RawChannel with write buffer nonempty";
212
213 // Reset the delegate so that it won't receive further calls.
214 delegate_ = nullptr;
215 read_stopped_ = true;
216 write_stopped_ = true;
217 weak_ptr_factory_.InvalidateWeakPtrs();
218
219 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
220 }
221
222 // Reminder: This must be thread-safe.
223 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
224 DCHECK(message);
225
226 base::AutoLock locker(write_lock_);
227 if (write_stopped_)
228 return false;
229
230 if (!write_buffer_->message_queue_.empty()) {
231 EnqueueMessageNoLock(message.Pass());
232 return true;
233 }
234
235 EnqueueMessageNoLock(message.Pass());
236 DCHECK_EQ(write_buffer_->data_offset_, 0u);
237
238 size_t platform_handles_written = 0;
239 size_t bytes_written = 0;
240 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
241 if (io_result == IO_PENDING)
242 return true;
243
244 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
245 bytes_written);
246 if (!result) {
247 // Even if we're on the I/O thread, don't call |OnError()| in the nested
248 // context.
249 message_loop_for_io_->PostTask(
250 FROM_HERE,
251 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
252 Delegate::ERROR_WRITE));
253 }
254
255 return result;
256 }
257
258 // Reminder: This must be thread-safe.
259 bool RawChannel::IsWriteBufferEmpty() {
260 base::AutoLock locker(write_lock_);
261 return write_buffer_->message_queue_.empty();
262 }
263
264 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
265 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
266
267 if (read_stopped_) {
268 NOTREACHED();
269 return;
270 }
271
272 // Keep reading data in a loop, and dispatch messages if enough data is
273 // received. Exit the loop if any of the following happens:
274 // - one or more messages were dispatched;
275 // - the last read failed, was a partial read or would block;
276 // - |Shutdown()| was called.
277 do {
278 switch (io_result) {
279 case IO_SUCCEEDED:
280 break;
281 case IO_FAILED_SHUTDOWN:
282 case IO_FAILED_BROKEN:
283 case IO_FAILED_UNKNOWN:
284 read_stopped_ = true;
285 CallOnError(ReadIOResultToError(io_result));
286 return;
287 case IO_PENDING:
288 NOTREACHED();
289 return;
290 }
291
292 read_buffer_->num_valid_bytes_ += bytes_read;
293
294 // Dispatch all the messages that we can.
295 bool did_dispatch_message = false;
296 // Tracks the offset of the first undispatched message in |read_buffer_|.
297 // Currently, we copy data to ensure that this is zero at the beginning.
298 size_t read_buffer_start = 0;
299 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
300 size_t message_size;
301 // Note that we rely on short-circuit evaluation here:
302 // - |read_buffer_start| may be an invalid index into
303 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
304 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
305 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
306 // next read).
307 // TODO(vtl): Validate that |message_size| is sane.
308 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
309 &read_buffer_->buffer_[read_buffer_start],
310 remaining_bytes, &message_size) &&
311 remaining_bytes >= message_size) {
312 MessageInTransit::View message_view(
313 message_size, &read_buffer_->buffer_[read_buffer_start]);
314 DCHECK_EQ(message_view.total_size(), message_size);
315
316 const char* error_message = nullptr;
317 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
318 &error_message)) {
319 DCHECK(error_message);
320 LOG(ERROR) << "Received invalid message: " << error_message;
321 read_stopped_ = true;
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return;
324 }
325
326 if (message_view.type() == MessageInTransit::kTypeRawChannel) {
327 if (!OnReadMessageForRawChannel(message_view)) {
328 read_stopped_ = true;
329 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
330 return;
331 }
332 } else {
333 embedder::ScopedPlatformHandleVectorPtr platform_handles;
334 if (message_view.transport_data_buffer()) {
335 size_t num_platform_handles;
336 const void* platform_handle_table;
337 TransportData::GetPlatformHandleTable(
338 message_view.transport_data_buffer(), &num_platform_handles,
339 &platform_handle_table);
340
341 if (num_platform_handles > 0) {
342 platform_handles =
343 GetReadPlatformHandles(num_platform_handles,
344 platform_handle_table).Pass();
345 if (!platform_handles) {
346 LOG(ERROR) << "Invalid number of platform handles received";
347 read_stopped_ = true;
348 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
349 return;
350 }
351 }
352 }
353
354 // TODO(vtl): In the case that we aren't expecting any platform handles,
355 // for the POSIX implementation, we should confirm that none are stored.
356
357 // Dispatch the message.
358 DCHECK(delegate_);
359 delegate_->OnReadMessage(message_view, platform_handles.Pass());
360 if (read_stopped_) {
361 // |Shutdown()| was called in |OnReadMessage()|.
362 // TODO(vtl): Add test for this case.
363 return;
364 }
365 }
366
367 did_dispatch_message = true;
368
369 // Update our state.
370 read_buffer_start += message_size;
371 remaining_bytes -= message_size;
372 }
373
374 if (read_buffer_start > 0) {
375 // Move data back to start.
376 read_buffer_->num_valid_bytes_ = remaining_bytes;
377 if (read_buffer_->num_valid_bytes_ > 0) {
378 memmove(&read_buffer_->buffer_[0],
379 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
380 }
381 read_buffer_start = 0;
382 }
383
384 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
385 kReadSize) {
386 // Use power-of-2 buffer sizes.
387 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
388 // maximum message size to whatever extent necessary).
389 // TODO(vtl): We may often be able to peek at the header and get the real
390 // required extra space (which may be much bigger than |kReadSize|).
391 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
392 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
393 new_size *= 2;
394
395 // TODO(vtl): It's suboptimal to zero out the fresh memory.
396 read_buffer_->buffer_.resize(new_size, 0);
397 }
398
399 // (1) If we dispatched any messages, stop reading for now (and let the
400 // message loop do its thing for another round).
401 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
402 // a single message. Risks: slower, more complex if we want to avoid lots of
403 // copying. ii. Keep reading until there's no more data and dispatch all the
404 // messages we can. Risks: starvation of other users of the message loop.)
405 // (2) If we didn't max out |kReadSize|, stop reading for now.
406 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
407 bytes_read = 0;
408 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
409 } while (io_result != IO_PENDING);
410 }
411
412 void RawChannel::OnWriteCompleted(IOResult io_result,
413 size_t platform_handles_written,
414 size_t bytes_written) {
415 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
416 DCHECK_NE(io_result, IO_PENDING);
417
418 bool did_fail = false;
419 {
420 base::AutoLock locker(write_lock_);
421 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
422
423 if (write_stopped_) {
424 NOTREACHED();
425 return;
426 }
427
428 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
429 bytes_written);
430 }
431
432 if (did_fail)
433 CallOnError(Delegate::ERROR_WRITE);
434 }
435
436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
437 write_lock_.AssertAcquired();
438 write_buffer_->message_queue_.push_back(message.release());
439 }
440
441 bool RawChannel::OnReadMessageForRawChannel(
442 const MessageInTransit::View& message_view) {
443 // No non-implementation specific |RawChannel| control messages.
444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
445 << ")";
446 return false;
447 }
448
449 // static
450 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
451 IOResult io_result) {
452 switch (io_result) {
453 case IO_FAILED_SHUTDOWN:
454 return Delegate::ERROR_READ_SHUTDOWN;
455 case IO_FAILED_BROKEN:
456 return Delegate::ERROR_READ_BROKEN;
457 case IO_FAILED_UNKNOWN:
458 return Delegate::ERROR_READ_UNKNOWN;
459 case IO_SUCCEEDED:
460 case IO_PENDING:
461 NOTREACHED();
462 break;
463 }
464 return Delegate::ERROR_READ_UNKNOWN;
465 }
466
467 void RawChannel::CallOnError(Delegate::Error error) {
468 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
469 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
470 if (delegate_)
471 delegate_->OnError(error);
472 }
473
474 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
475 size_t platform_handles_written,
476 size_t bytes_written) {
477 write_lock_.AssertAcquired();
478
479 DCHECK(!write_stopped_);
480 DCHECK(!write_buffer_->message_queue_.empty());
481
482 if (io_result == IO_SUCCEEDED) {
483 write_buffer_->platform_handles_offset_ += platform_handles_written;
484 write_buffer_->data_offset_ += bytes_written;
485
486 MessageInTransit* message = write_buffer_->message_queue_.front();
487 if (write_buffer_->data_offset_ >= message->total_size()) {
488 // Complete write.
489 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
490 write_buffer_->message_queue_.pop_front();
491 delete message;
492 write_buffer_->platform_handles_offset_ = 0;
493 write_buffer_->data_offset_ = 0;
494
495 if (write_buffer_->message_queue_.empty())
496 return true;
497 }
498
499 // Schedule the next write.
500 io_result = ScheduleWriteNoLock();
501 if (io_result == IO_PENDING)
502 return true;
503 DCHECK_NE(io_result, IO_SUCCEEDED);
504 }
505
506 write_stopped_ = true;
507 STLDeleteElements(&write_buffer_->message_queue_);
508 write_buffer_->platform_handles_offset_ = 0;
509 write_buffer_->data_offset_ = 0;
510 return false;
511 }
512
513 } // namespace system
514 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698