Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: third_party/mojo/src/mojo/edk/system/raw_channel.cc

Issue 1676913002: [mojo] Delete third_party/mojo (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: let's try that again Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "third_party/mojo/src/mojo/edk/system/raw_channel.h"
6
7 #include <string.h>
8 #include <algorithm>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "third_party/mojo/src/mojo/edk/system/message_in_transit.h"
16 #include "third_party/mojo/src/mojo/edk/system/transport_data.h"
17
18 namespace mojo {
19 namespace system {
20
21 const size_t kReadSize = 4096;
22
23 // RawChannel::ReadBuffer ------------------------------------------------------
24
25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
26 }
27
28 RawChannel::ReadBuffer::~ReadBuffer() {
29 }
30
31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
33 *addr = &buffer_[0] + num_valid_bytes_;
34 *size = kReadSize;
35 }
36
37 // RawChannel::WriteBuffer -----------------------------------------------------
38
39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
40 : serialized_platform_handle_size_(serialized_platform_handle_size),
41 platform_handles_offset_(0),
42 data_offset_(0) {
43 }
44
45 RawChannel::WriteBuffer::~WriteBuffer() {
46 message_queue_.Clear();
47 }
48
49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
50 if (message_queue_.IsEmpty())
51 return false;
52
53 const TransportData* transport_data =
54 message_queue_.PeekMessage()->transport_data();
55 if (!transport_data)
56 return false;
57
58 const embedder::PlatformHandleVector* all_platform_handles =
59 transport_data->platform_handles();
60 if (!all_platform_handles) {
61 DCHECK_EQ(platform_handles_offset_, 0u);
62 return false;
63 }
64 if (platform_handles_offset_ >= all_platform_handles->size()) {
65 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
66 return false;
67 }
68
69 return true;
70 }
71
72 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
73 size_t* num_platform_handles,
74 embedder::PlatformHandle** platform_handles,
75 void** serialization_data) {
76 DCHECK(HavePlatformHandlesToSend());
77
78 MessageInTransit* message = message_queue_.PeekMessage();
79 TransportData* transport_data = message->transport_data();
80 embedder::PlatformHandleVector* all_platform_handles =
81 transport_data->platform_handles();
82 *num_platform_handles =
83 all_platform_handles->size() - platform_handles_offset_;
84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
85
86 if (serialized_platform_handle_size_ > 0) {
87 size_t serialization_data_offset =
88 transport_data->platform_handle_table_offset();
89 serialization_data_offset +=
90 platform_handles_offset_ * serialized_platform_handle_size_;
91 *serialization_data = static_cast<char*>(transport_data->buffer()) +
92 serialization_data_offset;
93 } else {
94 *serialization_data = nullptr;
95 }
96 }
97
98 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
99 buffers->clear();
100
101 if (message_queue_.IsEmpty())
102 return;
103
104 const MessageInTransit* message = message_queue_.PeekMessage();
105 DCHECK_LT(data_offset_, message->total_size());
106 size_t bytes_to_write = message->total_size() - data_offset_;
107
108 size_t transport_data_buffer_size =
109 message->transport_data() ? message->transport_data()->buffer_size() : 0;
110
111 if (!transport_data_buffer_size) {
112 // Only write from the main buffer.
113 DCHECK_LT(data_offset_, message->main_buffer_size());
114 DCHECK_LE(bytes_to_write, message->main_buffer_size());
115 Buffer buffer = {
116 static_cast<const char*>(message->main_buffer()) + data_offset_,
117 bytes_to_write};
118 buffers->push_back(buffer);
119 return;
120 }
121
122 if (data_offset_ >= message->main_buffer_size()) {
123 // Only write from the transport data buffer.
124 DCHECK_LT(data_offset_ - message->main_buffer_size(),
125 transport_data_buffer_size);
126 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
127 Buffer buffer = {
128 static_cast<const char*>(message->transport_data()->buffer()) +
129 (data_offset_ - message->main_buffer_size()),
130 bytes_to_write};
131 buffers->push_back(buffer);
132 return;
133 }
134
135 // TODO(vtl): We could actually send out buffers from multiple messages, with
136 // the "stopping" condition being reaching a message with platform handles
137 // attached.
138
139 // Write from both buffers.
140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
141 transport_data_buffer_size);
142 Buffer buffer1 = {
143 static_cast<const char*>(message->main_buffer()) + data_offset_,
144 message->main_buffer_size() - data_offset_};
145 buffers->push_back(buffer1);
146 Buffer buffer2 = {
147 static_cast<const char*>(message->transport_data()->buffer()),
148 transport_data_buffer_size};
149 buffers->push_back(buffer2);
150 }
151
152 // RawChannel ------------------------------------------------------------------
153
154 RawChannel::RawChannel()
155 : message_loop_for_io_(nullptr),
156 delegate_(nullptr),
157 set_on_shutdown_(nullptr),
158 write_stopped_(false),
159 weak_ptr_factory_(this) {
160 }
161
162 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_);
164 DCHECK(!write_buffer_);
165
166 // No need to take the |write_lock_| here -- if there are still weak pointers
167 // outstanding, then we're hosed anyway (since we wouldn't be able to
168 // invalidate them cleanly, since we might not be on the I/O thread).
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
170 }
171
172 void RawChannel::Init(Delegate* delegate) {
173 DCHECK(delegate);
174
175 DCHECK(!delegate_);
176 delegate_ = delegate;
177
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
179 DCHECK(!message_loop_for_io_);
180 message_loop_for_io_ =
181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
182
183 // No need to take the lock. No one should be using us yet.
184 DCHECK(!read_buffer_);
185 read_buffer_.reset(new ReadBuffer);
186 DCHECK(!write_buffer_);
187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
188
189 OnInit();
190
191 IOResult io_result = ScheduleRead();
192 if (io_result != IO_PENDING) {
193 // This will notify the delegate about the read failure. Although we're on
194 // the I/O thread, don't call it in the nested context.
195 message_loop_for_io_->PostTask(
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
198 }
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
200 // the delegate), not an initialization failure.
201 }
202
203 void RawChannel::Shutdown() {
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
205
206 base::AutoLock locker(write_lock_);
207
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty())
209 << "Shutting down RawChannel with write buffer nonempty";
210
211 // Reset the delegate so that it won't receive further calls.
212 delegate_ = nullptr;
213 if (set_on_shutdown_) {
214 *set_on_shutdown_ = true;
215 set_on_shutdown_ = nullptr;
216 }
217 write_stopped_ = true;
218 weak_ptr_factory_.InvalidateWeakPtrs();
219
220 OnShutdownNoLock(std::move(read_buffer_), std::move(write_buffer_));
221 }
222
223 // Reminder: This must be thread-safe.
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
225 DCHECK(message);
226
227 base::AutoLock locker(write_lock_);
228 if (write_stopped_)
229 return false;
230
231 if (!write_buffer_->message_queue_.IsEmpty()) {
232 EnqueueMessageNoLock(std::move(message));
233 return true;
234 }
235
236 EnqueueMessageNoLock(std::move(message));
237 DCHECK_EQ(write_buffer_->data_offset_, 0u);
238
239 size_t platform_handles_written = 0;
240 size_t bytes_written = 0;
241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
242 if (io_result == IO_PENDING)
243 return true;
244
245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
246 bytes_written);
247 if (!result) {
248 // Even if we're on the I/O thread, don't call |OnError()| in the nested
249 // context.
250 message_loop_for_io_->PostTask(
251 FROM_HERE,
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
253 Delegate::ERROR_WRITE));
254 }
255
256 return result;
257 }
258
259 // Reminder: This must be thread-safe.
260 bool RawChannel::IsWriteBufferEmpty() {
261 base::AutoLock locker(write_lock_);
262 return write_buffer_->message_queue_.IsEmpty();
263 }
264
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
267
268 // Keep reading data in a loop, and dispatch messages if enough data is
269 // received. Exit the loop if any of the following happens:
270 // - one or more messages were dispatched;
271 // - the last read failed, was a partial read or would block;
272 // - |Shutdown()| was called.
273 do {
274 switch (io_result) {
275 case IO_SUCCEEDED:
276 break;
277 case IO_FAILED_SHUTDOWN:
278 case IO_FAILED_BROKEN:
279 case IO_FAILED_UNKNOWN:
280 CallOnError(ReadIOResultToError(io_result));
281 return; // |this| may have been destroyed in |CallOnError()|.
282 case IO_PENDING:
283 NOTREACHED();
284 return;
285 }
286
287 read_buffer_->num_valid_bytes_ += bytes_read;
288
289 // Dispatch all the messages that we can.
290 bool did_dispatch_message = false;
291 // Tracks the offset of the first undispatched message in |read_buffer_|.
292 // Currently, we copy data to ensure that this is zero at the beginning.
293 size_t read_buffer_start = 0;
294 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
295 size_t message_size;
296 // Note that we rely on short-circuit evaluation here:
297 // - |read_buffer_start| may be an invalid index into
298 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
299 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
301 // next read).
302 // TODO(vtl): Validate that |message_size| is sane.
303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
304 &read_buffer_->buffer_[read_buffer_start],
305 remaining_bytes, &message_size) &&
306 remaining_bytes >= message_size) {
307 MessageInTransit::View message_view(
308 message_size, &read_buffer_->buffer_[read_buffer_start]);
309 DCHECK_EQ(message_view.total_size(), message_size);
310
311 const char* error_message = nullptr;
312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
313 &error_message)) {
314 DCHECK(error_message);
315 LOG(ERROR) << "Received invalid message: " << error_message;
316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
317 return; // |this| may have been destroyed in |CallOnError()|.
318 }
319
320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
321 if (!OnReadMessageForRawChannel(message_view)) {
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return; // |this| may have been destroyed in |CallOnError()|.
324 }
325 } else {
326 embedder::ScopedPlatformHandleVectorPtr platform_handles;
327 if (message_view.transport_data_buffer()) {
328 size_t num_platform_handles;
329 const void* platform_handle_table;
330 TransportData::GetPlatformHandleTable(
331 message_view.transport_data_buffer(), &num_platform_handles,
332 &platform_handle_table);
333
334 if (num_platform_handles > 0) {
335 platform_handles = GetReadPlatformHandles(num_platform_handles,
336 platform_handle_table);
337 if (!platform_handles) {
338 LOG(ERROR) << "Invalid number of platform handles received";
339 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
340 return; // |this| may have been destroyed in |CallOnError()|.
341 }
342 }
343 }
344
345 // TODO(vtl): In the case that we aren't expecting any platform handles,
346 // for the POSIX implementation, we should confirm that none are stored.
347
348 // Dispatch the message.
349 // Detect the case when |Shutdown()| is called; subsequent destruction
350 // is also permitted then.
351 bool shutdown_called = false;
352 DCHECK(!set_on_shutdown_);
353 set_on_shutdown_ = &shutdown_called;
354 DCHECK(delegate_);
355 delegate_->OnReadMessage(message_view, std::move(platform_handles));
356 if (shutdown_called)
357 return;
358 set_on_shutdown_ = nullptr;
359 }
360
361 did_dispatch_message = true;
362
363 // Update our state.
364 read_buffer_start += message_size;
365 remaining_bytes -= message_size;
366 }
367
368 if (read_buffer_start > 0) {
369 // Move data back to start.
370 read_buffer_->num_valid_bytes_ = remaining_bytes;
371 if (read_buffer_->num_valid_bytes_ > 0) {
372 memmove(&read_buffer_->buffer_[0],
373 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
374 }
375 read_buffer_start = 0;
376 }
377
378 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
379 kReadSize) {
380 // Use power-of-2 buffer sizes.
381 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
382 // maximum message size to whatever extent necessary).
383 // TODO(vtl): We may often be able to peek at the header and get the real
384 // required extra space (which may be much bigger than |kReadSize|).
385 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
386 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
387 new_size *= 2;
388
389 // TODO(vtl): It's suboptimal to zero out the fresh memory.
390 read_buffer_->buffer_.resize(new_size, 0);
391 }
392
393 // (1) If we dispatched any messages, stop reading for now (and let the
394 // message loop do its thing for another round).
395 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
396 // a single message. Risks: slower, more complex if we want to avoid lots of
397 // copying. ii. Keep reading until there's no more data and dispatch all the
398 // messages we can. Risks: starvation of other users of the message loop.)
399 // (2) If we didn't max out |kReadSize|, stop reading for now.
400 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
401 bytes_read = 0;
402 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
403 } while (io_result != IO_PENDING);
404 }
405
406 void RawChannel::OnWriteCompleted(IOResult io_result,
407 size_t platform_handles_written,
408 size_t bytes_written) {
409 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
410 DCHECK_NE(io_result, IO_PENDING);
411
412 bool did_fail = false;
413 {
414 base::AutoLock locker(write_lock_);
415 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty());
416
417 if (write_stopped_) {
418 NOTREACHED();
419 return;
420 }
421
422 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
423 bytes_written);
424 }
425
426 if (did_fail) {
427 CallOnError(Delegate::ERROR_WRITE);
428 return; // |this| may have been destroyed in |CallOnError()|.
429 }
430 }
431
432 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
433 write_lock_.AssertAcquired();
434 write_buffer_->message_queue_.AddMessage(std::move(message));
435 }
436
437 bool RawChannel::OnReadMessageForRawChannel(
438 const MessageInTransit::View& message_view) {
439 // No non-implementation specific |RawChannel| control messages.
440 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
441 << ")";
442 return false;
443 }
444
445 // static
446 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
447 IOResult io_result) {
448 switch (io_result) {
449 case IO_FAILED_SHUTDOWN:
450 return Delegate::ERROR_READ_SHUTDOWN;
451 case IO_FAILED_BROKEN:
452 return Delegate::ERROR_READ_BROKEN;
453 case IO_FAILED_UNKNOWN:
454 return Delegate::ERROR_READ_UNKNOWN;
455 case IO_SUCCEEDED:
456 case IO_PENDING:
457 NOTREACHED();
458 break;
459 }
460 return Delegate::ERROR_READ_UNKNOWN;
461 }
462
463 void RawChannel::CallOnError(Delegate::Error error) {
464 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
465 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
466 if (delegate_) {
467 delegate_->OnError(error);
468 return; // |this| may have been destroyed in |OnError()|.
469 }
470 }
471
472 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
473 size_t platform_handles_written,
474 size_t bytes_written) {
475 write_lock_.AssertAcquired();
476
477 DCHECK(!write_stopped_);
478 DCHECK(!write_buffer_->message_queue_.IsEmpty());
479
480 if (io_result == IO_SUCCEEDED) {
481 write_buffer_->platform_handles_offset_ += platform_handles_written;
482 write_buffer_->data_offset_ += bytes_written;
483
484 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
485 if (write_buffer_->data_offset_ >= message->total_size()) {
486 // Complete write.
487 CHECK_EQ(write_buffer_->data_offset_, message->total_size());
488 write_buffer_->message_queue_.DiscardMessage();
489 write_buffer_->platform_handles_offset_ = 0;
490 write_buffer_->data_offset_ = 0;
491
492 if (write_buffer_->message_queue_.IsEmpty())
493 return true;
494 }
495
496 // Schedule the next write.
497 io_result = ScheduleWriteNoLock();
498 if (io_result == IO_PENDING)
499 return true;
500 DCHECK_NE(io_result, IO_SUCCEEDED);
501 }
502
503 write_stopped_ = true;
504 write_buffer_->message_queue_.Clear();
505 write_buffer_->platform_handles_offset_ = 0;
506 write_buffer_->data_offset_ = 0;
507 return false;
508 }
509
510 } // namespace system
511 } // namespace mojo
OLDNEW
« no previous file with comments | « third_party/mojo/src/mojo/edk/system/raw_channel.h ('k') | third_party/mojo/src/mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698