Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: convert remaining MP tests and simplify RawChannel destruction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h"
15 #include "mojo/edk/system/message_in_transit.h" 16 #include "mojo/edk/system/message_in_transit.h"
16 #include "mojo/edk/system/transport_data.h" 17 #include "mojo/edk/system/transport_data.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace system { 20 namespace system {
20 21
21 const size_t kReadSize = 4096; 22 const size_t kReadSize = 4096;
22 23
23 // RawChannel::ReadBuffer ------------------------------------------------------ 24 // RawChannel::ReadBuffer ------------------------------------------------------
24 25
25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
26 } 27 }
27 28
28 RawChannel::ReadBuffer::~ReadBuffer() { 29 RawChannel::ReadBuffer::~ReadBuffer() {
29 } 30 }
30 31
31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
33 *addr = &buffer_[0] + num_valid_bytes_; 34 *addr = &buffer_[0] + num_valid_bytes_;
34 *size = kReadSize; 35 *size = kReadSize;
35 } 36 }
36 37
37 // RawChannel::WriteBuffer ----------------------------------------------------- 38 // RawChannel::WriteBuffer -----------------------------------------------------
38 39
39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) 40 RawChannel::WriteBuffer::WriteBuffer()
40 : serialized_platform_handle_size_(serialized_platform_handle_size), 41 : serialized_platform_handle_size_(0),
41 platform_handles_offset_(0), 42 platform_handles_offset_(0),
42 data_offset_(0) { 43 data_offset_(0) {
43 } 44 }
44 45
45 RawChannel::WriteBuffer::~WriteBuffer() { 46 RawChannel::WriteBuffer::~WriteBuffer() {
46 message_queue_.Clear(); 47 message_queue_.Clear();
47 } 48 }
48 49
49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
50 if (message_queue_.IsEmpty()) 51 if (message_queue_.IsEmpty())
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 size_t transport_data_buffer_size = 109 size_t transport_data_buffer_size =
109 message->transport_data() ? message->transport_data()->buffer_size() : 0; 110 message->transport_data() ? message->transport_data()->buffer_size() : 0;
110 111
111 if (!transport_data_buffer_size) { 112 if (!transport_data_buffer_size) {
112 // Only write from the main buffer. 113 // Only write from the main buffer.
113 DCHECK_LT(data_offset_, message->main_buffer_size()); 114 DCHECK_LT(data_offset_, message->main_buffer_size());
114 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 115 DCHECK_LE(bytes_to_write, message->main_buffer_size());
115 Buffer buffer = { 116 Buffer buffer = {
116 static_cast<const char*>(message->main_buffer()) + data_offset_, 117 static_cast<const char*>(message->main_buffer()) + data_offset_,
117 bytes_to_write}; 118 bytes_to_write};
119
118 buffers->push_back(buffer); 120 buffers->push_back(buffer);
119 return; 121 return;
120 } 122 }
121 123
122 if (data_offset_ >= message->main_buffer_size()) { 124 if (data_offset_ >= message->main_buffer_size()) {
123 // Only write from the transport data buffer. 125 // Only write from the transport data buffer.
124 DCHECK_LT(data_offset_ - message->main_buffer_size(), 126 DCHECK_LT(data_offset_ - message->main_buffer_size(),
125 transport_data_buffer_size); 127 transport_data_buffer_size);
126 DCHECK_LE(bytes_to_write, transport_data_buffer_size); 128 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
127 Buffer buffer = { 129 Buffer buffer = {
128 static_cast<const char*>(message->transport_data()->buffer()) + 130 static_cast<const char*>(message->transport_data()->buffer()) +
129 (data_offset_ - message->main_buffer_size()), 131 (data_offset_ - message->main_buffer_size()),
130 bytes_to_write}; 132 bytes_to_write};
133
131 buffers->push_back(buffer); 134 buffers->push_back(buffer);
132 return; 135 return;
133 } 136 }
134 137
135 // TODO(vtl): We could actually send out buffers from multiple messages, with 138 // TODO(vtl): We could actually send out buffers from multiple messages, with
136 // the "stopping" condition being reaching a message with platform handles 139 // the "stopping" condition being reaching a message with platform handles
137 // attached. 140 // attached.
138 141
139 // Write from both buffers. 142 // Write from both buffers.
140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + 143 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
141 transport_data_buffer_size); 144 transport_data_buffer_size);
142 Buffer buffer1 = { 145 Buffer buffer1 = {
143 static_cast<const char*>(message->main_buffer()) + data_offset_, 146 static_cast<const char*>(message->main_buffer()) + data_offset_,
144 message->main_buffer_size() - data_offset_}; 147 message->main_buffer_size() - data_offset_};
145 buffers->push_back(buffer1); 148 buffers->push_back(buffer1);
146 Buffer buffer2 = { 149 Buffer buffer2 = {
147 static_cast<const char*>(message->transport_data()->buffer()), 150 static_cast<const char*>(message->transport_data()->buffer()),
148 transport_data_buffer_size}; 151 transport_data_buffer_size};
149 buffers->push_back(buffer2); 152 buffers->push_back(buffer2);
150 } 153 }
151 154
152 // RawChannel ------------------------------------------------------------------ 155 // RawChannel ------------------------------------------------------------------
153 156
154 RawChannel::RawChannel() 157 RawChannel::RawChannel()
155 : message_loop_for_io_(nullptr), 158 : message_loop_for_io_(nullptr),
159 set_on_shutdown_(nullptr),
156 delegate_(nullptr), 160 delegate_(nullptr),
157 set_on_shutdown_(nullptr), 161 write_ready_(false),
158 write_stopped_(false), 162 write_stopped_(false),
163 error_occurred_(false),
159 weak_ptr_factory_(this) { 164 weak_ptr_factory_(this) {
165 read_buffer_.reset(new ReadBuffer);
166 write_buffer_.reset(new WriteBuffer());
160 } 167 }
161 168
162 RawChannel::~RawChannel() { 169 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_); 170 DCHECK(!read_buffer_);
164 DCHECK(!write_buffer_); 171 DCHECK(!write_buffer_);
165 172
166 // No need to take the |write_lock_| here -- if there are still weak pointers 173 // Only want to decrement counter if Init was called.
167 // outstanding, then we're hosed anyway (since we wouldn't be able to 174 if (message_loop_for_io_) {
168 // invalidate them cleanly, since we might not be on the I/O thread). 175 // No need to take the |write_lock_| here -- if there are still weak
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 176 // pointers outstanding, then we're hosed anyway (since we wouldn't be able
177 // to invalidate them cleanly, since we might not be on the I/O thread).
178 // DCHECK(!weak_ptr_factory_.HasWeakPtrs());
179 embedder::internal::ChannelShutdown();
180 }
170 } 181 }
171 182
172 void RawChannel::Init(Delegate* delegate) { 183 void RawChannel::Init(Delegate* delegate) {
184 embedder::internal::ChannelStarted();
173 DCHECK(delegate); 185 DCHECK(delegate);
174 186
187 base::AutoLock read_locker(read_lock_);
188 // solves race where initialiing on io thread while main thread is serializing
189 // this channel and releases handle.
190 base::AutoLock locker(write_lock_);
191
175 DCHECK(!delegate_); 192 DCHECK(!delegate_);
176 delegate_ = delegate; 193 delegate_ = delegate;
177 194
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 195 //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
179 DCHECK(!message_loop_for_io_); 196 DCHECK(!message_loop_for_io_);
180 message_loop_for_io_ = 197 message_loop_for_io_ =
181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 198 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
182 199
183 // No need to take the lock. No one should be using us yet. 200 OnInit();
184 DCHECK(!read_buffer_);
185 read_buffer_.reset(new ReadBuffer);
186 DCHECK(!write_buffer_);
187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
188 201
189 OnInit(); 202 // Although this means that we can call back sync into the caller, that's
203 // easier than posting a task to do this, because there might also be pending
204 // read calls and we can't modify the buffer.
205 if (read_buffer_->num_valid_bytes()) {
206 // We had serialized read buffer data through SetInitialReadBufferData call.
207 // Make sure we read messages out of it now, otherwise the delegate won't
208 // get notified if no other data gets written to the pipe.
209 bool did_dispatch_message = false;
210 bool stop_dispatching = false;
211 DispatchMessages(&did_dispatch_message, &stop_dispatching);
212 }
190 213
191 IOResult io_result = ScheduleRead(); 214 IOResult io_result = ScheduleRead();
192 if (io_result != IO_PENDING) { 215 if (io_result != IO_PENDING) {
193 // This will notify the delegate about the read failure. Although we're on 216 // This will notify the delegate about the read failure. Although we're on
194 // the I/O thread, don't call it in the nested context. 217 // the I/O thread, don't call it in the nested context.
195 message_loop_for_io_->PostTask( 218 message_loop_for_io_->PostTask(
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, 219 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); 220 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
198 } 221 }
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying 222 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
200 // the delegate), not an initialization failure. 223 // the delegate), not an initialization failure.
224
225 write_ready_ = true;
226 write_buffer_->serialized_platform_handle_size_ =
227 GetSerializedPlatformHandleSize();
228 if (!write_buffer_->message_queue_.IsEmpty())
229 SendQueuedMessagesNoLock();
201 } 230 }
202 231
203 void RawChannel::Shutdown() { 232 void RawChannel::Shutdown() {
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 233 weak_ptr_factory_.InvalidateWeakPtrs();
205 234
235 // Normally, we want to flush any pending writes before shutting down. This
236 // doesn't apply when 1) we don't have a handle (for obvious reasons) or
237 // 2) when the other side already quit and asked us to close the handle to
238 // ensure that we read everything out of the pipe first.
239 if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) {
240 {
241 base::AutoLock read_locker(read_lock_);
242 base::AutoLock locker(write_lock_);
243 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
244 }
245 delete this;
246 return;
247 }
248
249 base::AutoLock read_locker(read_lock_);
206 base::AutoLock locker(write_lock_); 250 base::AutoLock locker(write_lock_);
251 DCHECK(read_buffer_->num_valid_bytes() == 0) <<
252 "RawChannel::Shutdown called but there is pending data to be read";
207 253
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) 254 // happens on shutdown if didn't call init when doing createduplicate
209 << "Shutting down RawChannel with write buffer nonempty"; 255 if (message_loop_for_io()) {
256 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
257 }
210 258
211 // Reset the delegate so that it won't receive further calls. 259 // Reset the delegate so that it won't receive further calls.
212 delegate_ = nullptr; 260 delegate_ = nullptr;
213 if (set_on_shutdown_) { 261 if (set_on_shutdown_) {
214 *set_on_shutdown_ = true; 262 *set_on_shutdown_ = true;
215 set_on_shutdown_ = nullptr; 263 set_on_shutdown_ = nullptr;
216 } 264 }
217 write_stopped_ = true;
218 weak_ptr_factory_.InvalidateWeakPtrs();
219 265
220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 266 // TODO(jam): probably remove this since it doesn't make sense now that we
267 // wait and flush pending messages.
268 // write_stopped_ = true;
269
270
271 bool empty = write_buffer_->message_queue_.IsEmpty();
272
273 // We may have no messages to write. However just because our end of the pipe
274 // wrote everything doesn't mean that the other end read it. We don't want to
275 // call FlushFileBuffers since a) that only works for server end of the pipe,
276 // and b) it pauses this thread (which can block a process on another, or
277 // worse hang if both pipes are in the same process).
278 scoped_ptr<MessageInTransit> quit_message(new MessageInTransit(
279 MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr));
280 EnqueueMessageNoLock(quit_message.Pass());
281
282 if (empty)
283 SendQueuedMessagesNoLock();
284 }
285
286 embedder::ScopedPlatformHandle RawChannel::ReleaseHandle(
287 std::vector<char>* read_buffer) {
288 //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this;
289
290 embedder::ScopedPlatformHandle rv;
291 {
292 base::AutoLock read_locker(read_lock_);
293 base::AutoLock locker(write_lock_);
294 rv = ReleaseHandleNoLock(read_buffer);
295
296 // TODO(jam); if we use these, use nolock versions of these methods that are
297 // copied.
298 if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) {
299 NOTREACHED() << "TODO(JAM)";
300 }
301
302 delegate_ = nullptr;
303
304 // The Unretained is safe because above cancelled IO so we shouldn't get any
305 // channel errors.
306 // |message_loop_for_io_| might not be set yet
307 embedder::internal::g_io_thread_task_runner->PostTask(
308 FROM_HERE,
309 base::Bind(&RawChannel::Shutdown, base::Unretained(this)));
310 }
311
312 return rv;
221 } 313 }
222 314
223 // Reminder: This must be thread-safe. 315 // Reminder: This must be thread-safe.
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 316 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
225 DCHECK(message); 317 DCHECK(message);
226
227 base::AutoLock locker(write_lock_); 318 base::AutoLock locker(write_lock_);
228 if (write_stopped_) 319 if (write_stopped_)
229 return false; 320 return false;
230 321
231 if (!write_buffer_->message_queue_.IsEmpty()) { 322 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty();
232 EnqueueMessageNoLock(message.Pass()); 323 EnqueueMessageNoLock(message.Pass());
233 return true; 324 if (queue_was_empty && write_ready_)
234 } 325 SendQueuedMessagesNoLock();
235 326
236 EnqueueMessageNoLock(message.Pass()); 327 return true;
328 }
329
330 void RawChannel::SendQueuedMessagesNoLock() {
237 DCHECK_EQ(write_buffer_->data_offset_, 0u); 331 DCHECK_EQ(write_buffer_->data_offset_, 0u);
238 332
239 size_t platform_handles_written = 0; 333 size_t platform_handles_written = 0;
240 size_t bytes_written = 0; 334 size_t bytes_written = 0;
241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 335 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
242 if (io_result == IO_PENDING) 336 if (io_result == IO_PENDING)
243 return true; 337 return;
244 338
245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, 339 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written,
246 bytes_written); 340 bytes_written);
247 if (!result) { 341 if (!result) {
248 // Even if we're on the I/O thread, don't call |OnError()| in the nested 342 // Even if we're on the I/O thread, don't call |OnError()| in the nested
249 // context. 343 // context.
250 message_loop_for_io_->PostTask( 344 message_loop_for_io_->PostTask(
251 FROM_HERE, 345 FROM_HERE,
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), 346 base::Bind(&RawChannel::LockAndCallOnError,
347 weak_ptr_factory_.GetWeakPtr(),
253 Delegate::ERROR_WRITE)); 348 Delegate::ERROR_WRITE));
254 } 349 }
255
256 return result;
257 } 350 }
258 351
259 // Reminder: This must be thread-safe. 352 // Reminder: This must be thread-safe.
260 bool RawChannel::IsWriteBufferEmpty() { 353 bool RawChannel::IsWriteBufferEmpty() {
261 base::AutoLock locker(write_lock_); 354 base::AutoLock locker(write_lock_);
262 return write_buffer_->message_queue_.IsEmpty(); 355 return write_buffer_->message_queue_.IsEmpty();
263 } 356 }
264 357
358 bool RawChannel::IsReadBufferEmpty() {
359 base::AutoLock locker(read_lock_);
360 return read_buffer_->num_valid_bytes_ != 0;
361 }
362
363 void RawChannel::SetInitialReadBufferData(char* data, size_t size) {
364 base::AutoLock locker(read_lock_);
365 // TODO(jam): copy power of 2 algorithm below? or share.
366 read_buffer_->buffer_.resize(size+kReadSize);
367 memcpy(&read_buffer_->buffer_[0], data, size);
368 read_buffer_->num_valid_bytes_ = size;
369 }
370
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 371 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 372 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
267 373
374 base::AutoLock locker(read_lock_);
375
268 // Keep reading data in a loop, and dispatch messages if enough data is 376 // Keep reading data in a loop, and dispatch messages if enough data is
269 // received. Exit the loop if any of the following happens: 377 // received. Exit the loop if any of the following happens:
270 // - one or more messages were dispatched; 378 // - one or more messages were dispatched;
271 // - the last read failed, was a partial read or would block; 379 // - the last read failed, was a partial read or would block;
272 // - |Shutdown()| was called. 380 // - |Shutdown()| was called.
273 do { 381 do {
274 switch (io_result) { 382 switch (io_result) {
275 case IO_SUCCEEDED: 383 case IO_SUCCEEDED:
276 break; 384 break;
277 case IO_FAILED_SHUTDOWN: 385 case IO_FAILED_SHUTDOWN:
278 case IO_FAILED_BROKEN: 386 case IO_FAILED_BROKEN:
279 case IO_FAILED_UNKNOWN: 387 case IO_FAILED_UNKNOWN:
280 CallOnError(ReadIOResultToError(io_result)); 388 CallOnError(ReadIOResultToError(io_result));
281 return; // |this| may have been destroyed in |CallOnError()|. 389 return; // |this| may have been destroyed in |CallOnError()|.
282 case IO_PENDING: 390 case IO_PENDING:
283 NOTREACHED(); 391 NOTREACHED();
284 return; 392 return;
285 } 393 }
286 394
287 read_buffer_->num_valid_bytes_ += bytes_read; 395 read_buffer_->num_valid_bytes_ += bytes_read;
288 396
289 // Dispatch all the messages that we can. 397 // Dispatch all the messages that we can.
290 bool did_dispatch_message = false; 398 bool did_dispatch_message = false;
291 // Tracks the offset of the first undispatched message in |read_buffer_|. 399 bool stop_dispatching = false;
292 // Currently, we copy data to ensure that this is zero at the beginning. 400 DispatchMessages(&did_dispatch_message, &stop_dispatching);
293 size_t read_buffer_start = 0; 401 if (stop_dispatching)
294 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 402 return;
295 size_t message_size;
296 // Note that we rely on short-circuit evaluation here:
297 // - |read_buffer_start| may be an invalid index into
298 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
299 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
301 // next read).
302 // TODO(vtl): Validate that |message_size| is sane.
303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
304 &read_buffer_->buffer_[read_buffer_start],
305 remaining_bytes, &message_size) &&
306 remaining_bytes >= message_size) {
307 MessageInTransit::View message_view(
308 message_size, &read_buffer_->buffer_[read_buffer_start]);
309 DCHECK_EQ(message_view.total_size(), message_size);
310
311 const char* error_message = nullptr;
312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
313 &error_message)) {
314 DCHECK(error_message);
315 LOG(ERROR) << "Received invalid message: " << error_message;
316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
317 return; // |this| may have been destroyed in |CallOnError()|.
318 }
319
320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) {
321 if (!OnReadMessageForRawChannel(message_view)) {
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return; // |this| may have been destroyed in |CallOnError()|.
324 }
325 } else {
326 embedder::ScopedPlatformHandleVectorPtr platform_handles;
327 if (message_view.transport_data_buffer()) {
328 size_t num_platform_handles;
329 const void* platform_handle_table;
330 TransportData::GetPlatformHandleTable(
331 message_view.transport_data_buffer(), &num_platform_handles,
332 &platform_handle_table);
333
334 if (num_platform_handles > 0) {
335 platform_handles =
336 GetReadPlatformHandles(num_platform_handles,
337 platform_handle_table).Pass();
338 if (!platform_handles) {
339 LOG(ERROR) << "Invalid number of platform handles received";
340 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
341 return; // |this| may have been destroyed in |CallOnError()|.
342 }
343 }
344 }
345
346 // TODO(vtl): In the case that we aren't expecting any platform handles,
347 // for the POSIX implementation, we should confirm that none are stored.
348
349 // Dispatch the message.
350 // Detect the case when |Shutdown()| is called; subsequent destruction
351 // is also permitted then.
352 bool shutdown_called = false;
353 DCHECK(!set_on_shutdown_);
354 set_on_shutdown_ = &shutdown_called;
355 DCHECK(delegate_);
356 delegate_->OnReadMessage(message_view, platform_handles.Pass());
357 if (shutdown_called)
358 return;
359 set_on_shutdown_ = nullptr;
360 }
361
362 did_dispatch_message = true;
363
364 // Update our state.
365 read_buffer_start += message_size;
366 remaining_bytes -= message_size;
367 }
368
369 if (read_buffer_start > 0) {
370 // Move data back to start.
371 read_buffer_->num_valid_bytes_ = remaining_bytes;
372 if (read_buffer_->num_valid_bytes_ > 0) {
373 memmove(&read_buffer_->buffer_[0],
374 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
375 }
376 read_buffer_start = 0;
377 }
378 403
379 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 404 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
380 kReadSize) { 405 kReadSize) {
381 // Use power-of-2 buffer sizes. 406 // Use power-of-2 buffer sizes.
382 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 407 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
383 // maximum message size to whatever extent necessary). 408 // maximum message size to whatever extent necessary).
384 // TODO(vtl): We may often be able to peek at the header and get the real 409 // TODO(vtl): We may often be able to peek at the header and get the real
385 // required extra space (which may be much bigger than |kReadSize|). 410 // required extra space (which may be much bigger than |kReadSize|).
386 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 411 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
387 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 412 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
(...skipping 30 matching lines...) Expand all
418 if (write_stopped_) { 443 if (write_stopped_) {
419 NOTREACHED(); 444 NOTREACHED();
420 return; 445 return;
421 } 446 }
422 447
423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, 448 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
424 bytes_written); 449 bytes_written);
425 } 450 }
426 451
427 if (did_fail) { 452 if (did_fail) {
453 base::AutoLock locker(read_lock_);
yzshen1 2015/09/23 22:47:09 nit: use LoadAndCallOnError instead?
428 CallOnError(Delegate::ERROR_WRITE); 454 CallOnError(Delegate::ERROR_WRITE);
429 return; // |this| may have been destroyed in |CallOnError()|. 455 return; // |this| may have been destroyed in |CallOnError()|.
430 } 456 }
431 } 457 }
432 458
433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 459 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
434 write_lock_.AssertAcquired(); 460 write_lock_.AssertAcquired();
461 DCHECK(HandleForDebuggingNoLock().is_valid());
435 write_buffer_->message_queue_.AddMessage(message.Pass()); 462 write_buffer_->message_queue_.AddMessage(message.Pass());
436 } 463 }
437 464
438 bool RawChannel::OnReadMessageForRawChannel( 465 bool RawChannel::OnReadMessageForRawChannel(
439 const MessageInTransit::View& message_view) { 466 const MessageInTransit::View& message_view) {
467 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) {
468 message_loop_for_io_->PostTask(
469 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError,
470 weak_ptr_factory_.GetWeakPtr(),
471 Delegate::ERROR_READ_SHUTDOWN));
472 return true;
473 }
474
440 // No non-implementation specific |RawChannel| control messages. 475 // No non-implementation specific |RawChannel| control messages.
441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() 476 LOG(ERROR) << "Invalid control message (type " << message_view.type()
442 << ")"; 477 << ")";
443 return false; 478 return false;
444 } 479 }
445 480
446 // static
447 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( 481 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
448 IOResult io_result) { 482 IOResult io_result) {
449 switch (io_result) { 483 switch (io_result) {
450 case IO_FAILED_SHUTDOWN: 484 case IO_FAILED_SHUTDOWN:
451 return Delegate::ERROR_READ_SHUTDOWN; 485 return Delegate::ERROR_READ_SHUTDOWN;
452 case IO_FAILED_BROKEN: 486 case IO_FAILED_BROKEN:
453 return Delegate::ERROR_READ_BROKEN; 487 return Delegate::ERROR_READ_BROKEN;
454 case IO_FAILED_UNKNOWN: 488 case IO_FAILED_UNKNOWN:
455 return Delegate::ERROR_READ_UNKNOWN; 489 return Delegate::ERROR_READ_UNKNOWN;
456 case IO_SUCCEEDED: 490 case IO_SUCCEEDED:
457 case IO_PENDING: 491 case IO_PENDING:
458 NOTREACHED(); 492 NOTREACHED();
459 break; 493 break;
460 } 494 }
461 return Delegate::ERROR_READ_UNKNOWN; 495 return Delegate::ERROR_READ_UNKNOWN;
462 } 496 }
463 497
464 void RawChannel::CallOnError(Delegate::Error error) { 498 void RawChannel::CallOnError(Delegate::Error error) {
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 499 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 500 read_lock_.AssertAcquired();
501 error_occurred_ = true;
467 if (delegate_) { 502 if (delegate_) {
468 delegate_->OnError(error); 503 delegate_->OnError(error);
469 return; // |this| may have been destroyed in |OnError()|. 504 } else {
505 // We depend on delegate to delete since it could be waiting to call
506 // ReleaseHandle.
507 base::MessageLoop::current()->PostTask(
508 FROM_HERE,
509 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr()));
470 } 510 }
471 } 511 }
472 512
513 void RawChannel::LockAndCallOnError(Delegate::Error error) {
514 base::AutoLock locker(read_lock_);
515 CallOnError(error);
516 }
517
473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 518 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
474 size_t platform_handles_written, 519 size_t platform_handles_written,
475 size_t bytes_written) { 520 size_t bytes_written) {
476 write_lock_.AssertAcquired(); 521 write_lock_.AssertAcquired();
477 522
478 DCHECK(!write_stopped_); 523 DCHECK(!write_stopped_);
479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 524 DCHECK(!write_buffer_->message_queue_.IsEmpty());
480 525
481 if (io_result == IO_SUCCEEDED) { 526 if (io_result == IO_SUCCEEDED) {
482 write_buffer_->platform_handles_offset_ += platform_handles_written; 527 write_buffer_->platform_handles_offset_ += platform_handles_written;
(...skipping 18 matching lines...) Expand all
501 DCHECK_NE(io_result, IO_SUCCEEDED); 546 DCHECK_NE(io_result, IO_SUCCEEDED);
502 } 547 }
503 548
504 write_stopped_ = true; 549 write_stopped_ = true;
505 write_buffer_->message_queue_.Clear(); 550 write_buffer_->message_queue_.Clear();
506 write_buffer_->platform_handles_offset_ = 0; 551 write_buffer_->platform_handles_offset_ = 0;
507 write_buffer_->data_offset_ = 0; 552 write_buffer_->data_offset_ = 0;
508 return false; 553 return false;
509 } 554 }
510 555
556 void RawChannel::DispatchMessages(bool* did_dispatch_message,
557 bool* stop_dispatching) {
558 *did_dispatch_message = false;
559 *stop_dispatching = false;
560 // Tracks the offset of the first undispatched message in |read_buffer_|.
561 // Currently, we copy data to ensure that this is zero at the beginning.
562 size_t read_buffer_start = 0;
563 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
564 size_t message_size;
565 // Note that we rely on short-circuit evaluation here:
566 // - |read_buffer_start| may be an invalid index into
567 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
568 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
569 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
570 // next read).
571 // TODO(vtl): Validate that |message_size| is sane.
572 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
573 &read_buffer_->buffer_[read_buffer_start],
574 remaining_bytes, &message_size) &&
575 remaining_bytes >= message_size) {
576 MessageInTransit::View message_view(
577 message_size, &read_buffer_->buffer_[read_buffer_start]);
578 DCHECK_EQ(message_view.total_size(), message_size);
579
580 const char* error_message = nullptr;
581 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
582 &error_message)) {
583 DCHECK(error_message);
584 LOG(ERROR) << "Received invalid message: " << error_message;
585 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
586 *stop_dispatching = true;
587 return; // |this| may have been destroyed in |CallOnError()|.
588 }
589
590 if (message_view.type() != MessageInTransit::Type::MESSAGE) {
591 if (!OnReadMessageForRawChannel(message_view)) {
592 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
593 *stop_dispatching = true;
594 return; // |this| may have been destroyed in |CallOnError()|.
595 }
596 } else {
597 embedder::ScopedPlatformHandleVectorPtr platform_handles;
598 if (message_view.transport_data_buffer()) {
599 size_t num_platform_handles;
600 const void* platform_handle_table;
601 TransportData::GetPlatformHandleTable(
602 message_view.transport_data_buffer(), &num_platform_handles,
603 &platform_handle_table);
604
605 if (num_platform_handles > 0) {
606 platform_handles =
607 GetReadPlatformHandles(num_platform_handles,
608 platform_handle_table).Pass();
609 if (!platform_handles) {
610 LOG(ERROR) << "Invalid number of platform handles received";
611 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
612 *stop_dispatching = true;
613 return; // |this| may have been destroyed in |CallOnError()|.
614 }
615 }
616 }
617
618 // TODO(vtl): In the case that we aren't expecting any platform handles,
619 // for the POSIX implementation, we should confirm that none are stored.
620
621 // Dispatch the message.
622 // Detect the case when |Shutdown()| is called; subsequent destruction
623 // is also permitted then.
624 bool shutdown_called = false;
yzshen1 2015/09/23 22:47:09 Shutdown() is no longer possible to be called from
625 DCHECK(!set_on_shutdown_);
626 set_on_shutdown_ = &shutdown_called;
627 // Note: it's valid to get here without a delegate. i.e. after Shutdown
628 // is called, if this object still has a valid handle we keep it alive
629 // until the other side closes it in response to the RAW_CHANNEL_QUIT
630 // message. In the meantime the sender could have sent us a message.
631 if (delegate_)
632 delegate_->OnReadMessage(message_view, platform_handles.Pass());
633 if (shutdown_called) {
634 *stop_dispatching = true;
635 return;
636 }
637 set_on_shutdown_ = nullptr;
638 }
639
640 *did_dispatch_message = true;
641
642 // Update our state.
643 read_buffer_start += message_size;
644 remaining_bytes -= message_size;
645 }
646
647 if (read_buffer_start > 0) {
648 // Move data back to start.
649 read_buffer_->num_valid_bytes_ = remaining_bytes;
650 if (read_buffer_->num_valid_bytes_ > 0) {
651 memmove(&read_buffer_->buffer_[0],
652 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
653 }
654 read_buffer_start = 0;
655 }
656 }
657
511 } // namespace system 658 } // namespace system
512 } // namespace mojo 659 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698