OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
149 static_cast<const char*>(message->transport_data()->buffer()), | 149 static_cast<const char*>(message->transport_data()->buffer()), |
150 transport_data_buffer_size}; | 150 transport_data_buffer_size}; |
151 buffers->push_back(buffer2); | 151 buffers->push_back(buffer2); |
152 } | 152 } |
153 | 153 |
154 // RawChannel ------------------------------------------------------------------ | 154 // RawChannel ------------------------------------------------------------------ |
155 | 155 |
156 RawChannel::RawChannel() | 156 RawChannel::RawChannel() |
157 : message_loop_for_io_(nullptr), | 157 : message_loop_for_io_(nullptr), |
158 delegate_(nullptr), | 158 delegate_(nullptr), |
159 read_stopped_(false), | 159 set_on_shutdown_(nullptr), |
160 write_stopped_(false), | 160 write_stopped_(false), |
161 weak_ptr_factory_(this) { | 161 weak_ptr_factory_(this) { |
162 } | 162 } |
163 | 163 |
164 RawChannel::~RawChannel() { | 164 RawChannel::~RawChannel() { |
165 DCHECK(!read_buffer_); | 165 DCHECK(!read_buffer_); |
166 DCHECK(!write_buffer_); | 166 DCHECK(!write_buffer_); |
167 | 167 |
168 // No need to take the |write_lock_| here -- if there are still weak pointers | 168 // No need to take the |write_lock_| here -- if there are still weak pointers |
169 // outstanding, then we're hosed anyway (since we wouldn't be able to | 169 // outstanding, then we're hosed anyway (since we wouldn't be able to |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
205 void RawChannel::Shutdown() { | 205 void RawChannel::Shutdown() { |
206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
207 | 207 |
208 base::AutoLock locker(write_lock_); | 208 base::AutoLock locker(write_lock_); |
209 | 209 |
210 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) | 210 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) |
211 << "Shutting down RawChannel with write buffer nonempty"; | 211 << "Shutting down RawChannel with write buffer nonempty"; |
212 | 212 |
213 // Reset the delegate so that it won't receive further calls. | 213 // Reset the delegate so that it won't receive further calls. |
214 delegate_ = nullptr; | 214 delegate_ = nullptr; |
215 read_stopped_ = true; | 215 if (set_on_shutdown_) { |
216 *set_on_shutdown_ = true; | |
217 set_on_shutdown_ = nullptr; | |
218 } | |
216 write_stopped_ = true; | 219 write_stopped_ = true; |
217 weak_ptr_factory_.InvalidateWeakPtrs(); | 220 weak_ptr_factory_.InvalidateWeakPtrs(); |
218 | 221 |
219 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 222 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
220 } | 223 } |
221 | 224 |
222 // Reminder: This must be thread-safe. | 225 // Reminder: This must be thread-safe. |
223 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 226 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
224 DCHECK(message); | 227 DCHECK(message); |
225 | 228 |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
257 | 260 |
258 // Reminder: This must be thread-safe. | 261 // Reminder: This must be thread-safe. |
259 bool RawChannel::IsWriteBufferEmpty() { | 262 bool RawChannel::IsWriteBufferEmpty() { |
260 base::AutoLock locker(write_lock_); | 263 base::AutoLock locker(write_lock_); |
261 return write_buffer_->message_queue_.empty(); | 264 return write_buffer_->message_queue_.empty(); |
262 } | 265 } |
263 | 266 |
264 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 267 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
265 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 268 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
266 | 269 |
267 if (read_stopped_) { | |
yzshen1
2015/01/21 23:03:42
Without this NOTREACHED(), maybe we should add mor
viettrungluu
2015/01/21 23:34:31
Done.
| |
268 NOTREACHED(); | |
269 return; | |
270 } | |
271 | |
272 // Keep reading data in a loop, and dispatch messages if enough data is | 270 // Keep reading data in a loop, and dispatch messages if enough data is |
273 // received. Exit the loop if any of the following happens: | 271 // received. Exit the loop if any of the following happens: |
274 // - one or more messages were dispatched; | 272 // - one or more messages were dispatched; |
275 // - the last read failed, was a partial read or would block; | 273 // - the last read failed, was a partial read or would block; |
276 // - |Shutdown()| was called. | 274 // - |Shutdown()| was called. |
277 do { | 275 do { |
278 switch (io_result) { | 276 switch (io_result) { |
279 case IO_SUCCEEDED: | 277 case IO_SUCCEEDED: |
280 break; | 278 break; |
281 case IO_FAILED_SHUTDOWN: | 279 case IO_FAILED_SHUTDOWN: |
282 case IO_FAILED_BROKEN: | 280 case IO_FAILED_BROKEN: |
283 case IO_FAILED_UNKNOWN: | 281 case IO_FAILED_UNKNOWN: |
284 read_stopped_ = true; | |
285 CallOnError(ReadIOResultToError(io_result)); | 282 CallOnError(ReadIOResultToError(io_result)); |
286 return; | 283 return; // |this| may have been destroyed in |CallOnError()|. |
287 case IO_PENDING: | 284 case IO_PENDING: |
288 NOTREACHED(); | 285 NOTREACHED(); |
289 return; | 286 return; |
290 } | 287 } |
291 | 288 |
292 read_buffer_->num_valid_bytes_ += bytes_read; | 289 read_buffer_->num_valid_bytes_ += bytes_read; |
293 | 290 |
294 // Dispatch all the messages that we can. | 291 // Dispatch all the messages that we can. |
295 bool did_dispatch_message = false; | 292 bool did_dispatch_message = false; |
296 // Tracks the offset of the first undispatched message in |read_buffer_|. | 293 // Tracks the offset of the first undispatched message in |read_buffer_|. |
(...skipping 14 matching lines...) Expand all Loading... | |
311 remaining_bytes >= message_size) { | 308 remaining_bytes >= message_size) { |
312 MessageInTransit::View message_view( | 309 MessageInTransit::View message_view( |
313 message_size, &read_buffer_->buffer_[read_buffer_start]); | 310 message_size, &read_buffer_->buffer_[read_buffer_start]); |
314 DCHECK_EQ(message_view.total_size(), message_size); | 311 DCHECK_EQ(message_view.total_size(), message_size); |
315 | 312 |
316 const char* error_message = nullptr; | 313 const char* error_message = nullptr; |
317 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | 314 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
318 &error_message)) { | 315 &error_message)) { |
319 DCHECK(error_message); | 316 DCHECK(error_message); |
320 LOG(ERROR) << "Received invalid message: " << error_message; | 317 LOG(ERROR) << "Received invalid message: " << error_message; |
321 read_stopped_ = true; | |
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 318 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
323 return; | 319 return; // |this| may have been destroyed in |CallOnError()|. |
324 } | 320 } |
325 | 321 |
326 if (message_view.type() == MessageInTransit::kTypeRawChannel) { | 322 if (message_view.type() == MessageInTransit::kTypeRawChannel) { |
327 if (!OnReadMessageForRawChannel(message_view)) { | 323 if (!OnReadMessageForRawChannel(message_view)) { |
328 read_stopped_ = true; | |
329 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 324 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
330 return; | 325 return; // |this| may have been destroyed in |CallOnError()|. |
331 } | 326 } |
332 } else { | 327 } else { |
333 embedder::ScopedPlatformHandleVectorPtr platform_handles; | 328 embedder::ScopedPlatformHandleVectorPtr platform_handles; |
334 if (message_view.transport_data_buffer()) { | 329 if (message_view.transport_data_buffer()) { |
335 size_t num_platform_handles; | 330 size_t num_platform_handles; |
336 const void* platform_handle_table; | 331 const void* platform_handle_table; |
337 TransportData::GetPlatformHandleTable( | 332 TransportData::GetPlatformHandleTable( |
338 message_view.transport_data_buffer(), &num_platform_handles, | 333 message_view.transport_data_buffer(), &num_platform_handles, |
339 &platform_handle_table); | 334 &platform_handle_table); |
340 | 335 |
341 if (num_platform_handles > 0) { | 336 if (num_platform_handles > 0) { |
342 platform_handles = | 337 platform_handles = |
343 GetReadPlatformHandles(num_platform_handles, | 338 GetReadPlatformHandles(num_platform_handles, |
344 platform_handle_table).Pass(); | 339 platform_handle_table).Pass(); |
345 if (!platform_handles) { | 340 if (!platform_handles) { |
346 LOG(ERROR) << "Invalid number of platform handles received"; | 341 LOG(ERROR) << "Invalid number of platform handles received"; |
347 read_stopped_ = true; | |
348 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 342 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
349 return; | 343 return; // |this| may have been destroyed in |CallOnError()|. |
350 } | 344 } |
351 } | 345 } |
352 } | 346 } |
353 | 347 |
354 // TODO(vtl): In the case that we aren't expecting any platform handles, | 348 // TODO(vtl): In the case that we aren't expecting any platform handles, |
355 // for the POSIX implementation, we should confirm that none are stored. | 349 // for the POSIX implementation, we should confirm that none are stored. |
356 | 350 |
357 // Dispatch the message. | 351 // Dispatch the message. |
352 // Detect the case when |Shutdown()| is called; subsequent destruction | |
353 // is also permitted then. | |
354 bool shutdown_called = false; | |
355 DCHECK(!set_on_shutdown_); | |
356 set_on_shutdown_ = &shutdown_called; | |
358 DCHECK(delegate_); | 357 DCHECK(delegate_); |
359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | 358 delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
360 if (read_stopped_) { | 359 if (shutdown_called) |
361 // |Shutdown()| was called in |OnReadMessage()|. | |
362 // TODO(vtl): Add test for this case. | |
363 return; | 360 return; |
364 } | 361 set_on_shutdown_ = nullptr; |
365 } | 362 } |
366 | 363 |
367 did_dispatch_message = true; | 364 did_dispatch_message = true; |
368 | 365 |
369 // Update our state. | 366 // Update our state. |
370 read_buffer_start += message_size; | 367 read_buffer_start += message_size; |
371 remaining_bytes -= message_size; | 368 remaining_bytes -= message_size; |
372 } | 369 } |
373 | 370 |
374 if (read_buffer_start > 0) { | 371 if (read_buffer_start > 0) { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
422 | 419 |
423 if (write_stopped_) { | 420 if (write_stopped_) { |
424 NOTREACHED(); | 421 NOTREACHED(); |
425 return; | 422 return; |
426 } | 423 } |
427 | 424 |
428 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | 425 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, |
429 bytes_written); | 426 bytes_written); |
430 } | 427 } |
431 | 428 |
432 if (did_fail) | 429 if (did_fail) { |
433 CallOnError(Delegate::ERROR_WRITE); | 430 CallOnError(Delegate::ERROR_WRITE); |
431 return; // |this| may have been destroyed in |CallOnError()|. | |
432 } | |
434 } | 433 } |
435 | 434 |
436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 435 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
437 write_lock_.AssertAcquired(); | 436 write_lock_.AssertAcquired(); |
438 write_buffer_->message_queue_.push_back(message.release()); | 437 write_buffer_->message_queue_.push_back(message.release()); |
439 } | 438 } |
440 | 439 |
441 bool RawChannel::OnReadMessageForRawChannel( | 440 bool RawChannel::OnReadMessageForRawChannel( |
442 const MessageInTransit::View& message_view) { | 441 const MessageInTransit::View& message_view) { |
443 // No non-implementation specific |RawChannel| control messages. | 442 // No non-implementation specific |RawChannel| control messages. |
(...skipping 16 matching lines...) Expand all Loading... | |
460 case IO_PENDING: | 459 case IO_PENDING: |
461 NOTREACHED(); | 460 NOTREACHED(); |
462 break; | 461 break; |
463 } | 462 } |
464 return Delegate::ERROR_READ_UNKNOWN; | 463 return Delegate::ERROR_READ_UNKNOWN; |
465 } | 464 } |
466 | 465 |
467 void RawChannel::CallOnError(Delegate::Error error) { | 466 void RawChannel::CallOnError(Delegate::Error error) { |
468 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 467 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
469 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 468 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
470 if (delegate_) | 469 if (delegate_) { |
471 delegate_->OnError(error); | 470 delegate_->OnError(error); |
471 return; // |this| may have been destroyed in |OnError()|. | |
472 } | |
472 } | 473 } |
473 | 474 |
474 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 475 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
475 size_t platform_handles_written, | 476 size_t platform_handles_written, |
476 size_t bytes_written) { | 477 size_t bytes_written) { |
477 write_lock_.AssertAcquired(); | 478 write_lock_.AssertAcquired(); |
478 | 479 |
479 DCHECK(!write_stopped_); | 480 DCHECK(!write_stopped_); |
480 DCHECK(!write_buffer_->message_queue_.empty()); | 481 DCHECK(!write_buffer_->message_queue_.empty()); |
481 | 482 |
(...skipping 23 matching lines...) Expand all Loading... | |
505 | 506 |
506 write_stopped_ = true; | 507 write_stopped_ = true; |
507 STLDeleteElements(&write_buffer_->message_queue_); | 508 STLDeleteElements(&write_buffer_->message_queue_); |
508 write_buffer_->platform_handles_offset_ = 0; | 509 write_buffer_->platform_handles_offset_ = 0; |
509 write_buffer_->data_offset_ = 0; | 510 write_buffer_->data_offset_ = 0; |
510 return false; | 511 return false; |
511 } | 512 } |
512 | 513 |
513 } // namespace system | 514 } // namespace system |
514 } // namespace mojo | 515 } // namespace mojo |
OLD | NEW |