| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 | 10 |
| (...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 156 delegate_(nullptr), | 156 delegate_(nullptr), |
| 157 set_on_shutdown_(nullptr), | 157 set_on_shutdown_(nullptr), |
| 158 write_stopped_(false), | 158 write_stopped_(false), |
| 159 weak_ptr_factory_(this) { | 159 weak_ptr_factory_(this) { |
| 160 } | 160 } |
| 161 | 161 |
| 162 RawChannel::~RawChannel() { | 162 RawChannel::~RawChannel() { |
| 163 DCHECK(!read_buffer_); | 163 DCHECK(!read_buffer_); |
| 164 DCHECK(!write_buffer_); | 164 DCHECK(!write_buffer_); |
| 165 | 165 |
| 166 // No need to take the |write_lock_| here -- if there are still weak pointers | 166 // No need to take |write_mutex_| here -- if there are still weak pointers |
| 167 // outstanding, then we're hosed anyway (since we wouldn't be able to | 167 // outstanding, then we're hosed anyway (since we wouldn't be able to |
| 168 // invalidate them cleanly, since we might not be on the I/O thread). | 168 // invalidate them cleanly, since we might not be on the I/O thread). |
| 169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| 170 } | 170 } |
| 171 | 171 |
| 172 void RawChannel::Init(Delegate* delegate) { | 172 void RawChannel::Init(Delegate* delegate) { |
| 173 DCHECK(delegate); | 173 DCHECK(delegate); |
| 174 | 174 |
| 175 DCHECK(!delegate_); | 175 DCHECK(!delegate_); |
| 176 delegate_ = delegate; | 176 delegate_ = delegate; |
| (...skipping 19 matching lines...) Expand all Loading... |
| 196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, | 196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, |
| 197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | 197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); |
| 198 } | 198 } |
| 199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | 199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
| 200 // the delegate), not an initialization failure. | 200 // the delegate), not an initialization failure. |
| 201 } | 201 } |
| 202 | 202 |
| 203 void RawChannel::Shutdown() { | 203 void RawChannel::Shutdown() { |
| 204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 205 | 205 |
| 206 base::AutoLock locker(write_lock_); | 206 MutexLocker locker(&write_mutex_); |
| 207 | 207 |
| 208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) | 208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) |
| 209 << "Shutting down RawChannel with write buffer nonempty"; | 209 << "Shutting down RawChannel with write buffer nonempty"; |
| 210 | 210 |
| 211 // Reset the delegate so that it won't receive further calls. | 211 // Reset the delegate so that it won't receive further calls. |
| 212 delegate_ = nullptr; | 212 delegate_ = nullptr; |
| 213 if (set_on_shutdown_) { | 213 if (set_on_shutdown_) { |
| 214 *set_on_shutdown_ = true; | 214 *set_on_shutdown_ = true; |
| 215 set_on_shutdown_ = nullptr; | 215 set_on_shutdown_ = nullptr; |
| 216 } | 216 } |
| 217 write_stopped_ = true; | 217 write_stopped_ = true; |
| 218 weak_ptr_factory_.InvalidateWeakPtrs(); | 218 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 219 | 219 |
| 220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
| 221 } | 221 } |
| 222 | 222 |
| 223 // Reminder: This must be thread-safe. | 223 // Reminder: This must be thread-safe. |
| 224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| 225 DCHECK(message); | 225 DCHECK(message); |
| 226 | 226 |
| 227 base::AutoLock locker(write_lock_); | 227 MutexLocker locker(&write_mutex_); |
| 228 if (write_stopped_) | 228 if (write_stopped_) |
| 229 return false; | 229 return false; |
| 230 | 230 |
| 231 if (!write_buffer_->message_queue_.IsEmpty()) { | 231 if (!write_buffer_->message_queue_.IsEmpty()) { |
| 232 EnqueueMessageNoLock(message.Pass()); | 232 EnqueueMessageNoLock(message.Pass()); |
| 233 return true; | 233 return true; |
| 234 } | 234 } |
| 235 | 235 |
| 236 EnqueueMessageNoLock(message.Pass()); | 236 EnqueueMessageNoLock(message.Pass()); |
| 237 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 237 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
| (...skipping 13 matching lines...) Expand all Loading... |
| 251 FROM_HERE, | 251 FROM_HERE, |
| 252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), | 252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), |
| 253 Delegate::ERROR_WRITE)); | 253 Delegate::ERROR_WRITE)); |
| 254 } | 254 } |
| 255 | 255 |
| 256 return result; | 256 return result; |
| 257 } | 257 } |
| 258 | 258 |
| 259 // Reminder: This must be thread-safe. | 259 // Reminder: This must be thread-safe. |
| 260 bool RawChannel::IsWriteBufferEmpty() { | 260 bool RawChannel::IsWriteBufferEmpty() { |
| 261 base::AutoLock locker(write_lock_); | 261 MutexLocker locker(&write_mutex_); |
| 262 return write_buffer_->message_queue_.IsEmpty(); | 262 return write_buffer_->message_queue_.IsEmpty(); |
| 263 } | 263 } |
| 264 | 264 |
| 265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
| 266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 267 | 267 |
| 268 // Keep reading data in a loop, and dispatch messages if enough data is | 268 // Keep reading data in a loop, and dispatch messages if enough data is |
| 269 // received. Exit the loop if any of the following happens: | 269 // received. Exit the loop if any of the following happens: |
| 270 // - one or more messages were dispatched; | 270 // - one or more messages were dispatched; |
| 271 // - the last read failed, was a partial read or would block; | 271 // - the last read failed, was a partial read or would block; |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 405 } | 405 } |
| 406 | 406 |
| 407 void RawChannel::OnWriteCompleted(IOResult io_result, | 407 void RawChannel::OnWriteCompleted(IOResult io_result, |
| 408 size_t platform_handles_written, | 408 size_t platform_handles_written, |
| 409 size_t bytes_written) { | 409 size_t bytes_written) { |
| 410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 411 DCHECK_NE(io_result, IO_PENDING); | 411 DCHECK_NE(io_result, IO_PENDING); |
| 412 | 412 |
| 413 bool did_fail = false; | 413 bool did_fail = false; |
| 414 { | 414 { |
| 415 base::AutoLock locker(write_lock_); | 415 MutexLocker locker(&write_mutex_); |
| 416 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty()); | 416 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty()); |
| 417 | 417 |
| 418 if (write_stopped_) { | 418 if (write_stopped_) { |
| 419 NOTREACHED(); | 419 NOTREACHED(); |
| 420 return; | 420 return; |
| 421 } | 421 } |
| 422 | 422 |
| 423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | 423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, |
| 424 bytes_written); | 424 bytes_written); |
| 425 } | 425 } |
| 426 | 426 |
| 427 if (did_fail) { | 427 if (did_fail) { |
| 428 CallOnError(Delegate::ERROR_WRITE); | 428 CallOnError(Delegate::ERROR_WRITE); |
| 429 return; // |this| may have been destroyed in |CallOnError()|. | 429 return; // |this| may have been destroyed in |CallOnError()|. |
| 430 } | 430 } |
| 431 } | 431 } |
| 432 | 432 |
| 433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
| 434 write_lock_.AssertAcquired(); | 434 write_mutex_.AssertHeld(); |
| 435 write_buffer_->message_queue_.AddMessage(message.Pass()); | 435 write_buffer_->message_queue_.AddMessage(message.Pass()); |
| 436 } | 436 } |
| 437 | 437 |
| 438 bool RawChannel::OnReadMessageForRawChannel( | 438 bool RawChannel::OnReadMessageForRawChannel( |
| 439 const MessageInTransit::View& message_view) { | 439 const MessageInTransit::View& message_view) { |
| 440 // No non-implementation specific |RawChannel| control messages. | 440 // No non-implementation specific |RawChannel| control messages. |
| 441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() | 441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() |
| 442 << ")"; | 442 << ")"; |
| 443 return false; | 443 return false; |
| 444 } | 444 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 456 case IO_SUCCEEDED: | 456 case IO_SUCCEEDED: |
| 457 case IO_PENDING: | 457 case IO_PENDING: |
| 458 NOTREACHED(); | 458 NOTREACHED(); |
| 459 break; | 459 break; |
| 460 } | 460 } |
| 461 return Delegate::ERROR_READ_UNKNOWN; | 461 return Delegate::ERROR_READ_UNKNOWN; |
| 462 } | 462 } |
| 463 | 463 |
| 464 void RawChannel::CallOnError(Delegate::Error error) { | 464 void RawChannel::CallOnError(Delegate::Error error) { |
| 465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 466 // TODO(vtl): Add a "write_mutex_.AssertNotHeld()"? |
| 467 if (delegate_) { | 467 if (delegate_) { |
| 468 delegate_->OnError(error); | 468 delegate_->OnError(error); |
| 469 return; // |this| may have been destroyed in |OnError()|. | 469 return; // |this| may have been destroyed in |OnError()|. |
| 470 } | 470 } |
| 471 } | 471 } |
| 472 | 472 |
| 473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
| 474 size_t platform_handles_written, | 474 size_t platform_handles_written, |
| 475 size_t bytes_written) { | 475 size_t bytes_written) { |
| 476 write_lock_.AssertAcquired(); | 476 write_mutex_.AssertHeld(); |
| 477 | 477 |
| 478 DCHECK(!write_stopped_); | 478 DCHECK(!write_stopped_); |
| 479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | 479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); |
| 480 | 480 |
| 481 if (io_result == IO_SUCCEEDED) { | 481 if (io_result == IO_SUCCEEDED) { |
| 482 write_buffer_->platform_handles_offset_ += platform_handles_written; | 482 write_buffer_->platform_handles_offset_ += platform_handles_written; |
| 483 write_buffer_->data_offset_ += bytes_written; | 483 write_buffer_->data_offset_ += bytes_written; |
| 484 | 484 |
| 485 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); | 485 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); |
| 486 if (write_buffer_->data_offset_ >= message->total_size()) { | 486 if (write_buffer_->data_offset_ >= message->total_size()) { |
| (...skipping 16 matching lines...) Expand all Loading... |
| 503 | 503 |
| 504 write_stopped_ = true; | 504 write_stopped_ = true; |
| 505 write_buffer_->message_queue_.Clear(); | 505 write_buffer_->message_queue_.Clear(); |
| 506 write_buffer_->platform_handles_offset_ = 0; | 506 write_buffer_->platform_handles_offset_ = 0; |
| 507 write_buffer_->data_offset_ = 0; | 507 write_buffer_->data_offset_ = 0; |
| 508 return false; | 508 return false; |
| 509 } | 509 } |
| 510 | 510 |
| 511 } // namespace system | 511 } // namespace system |
| 512 } // namespace mojo | 512 } // namespace mojo |
| OLD | NEW |