OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
85 size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { | 85 size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { |
86 if (message_queue_.empty()) | 86 if (message_queue_.empty()) |
87 return 0; | 87 return 0; |
88 | 88 |
89 MessageInTransit* message = message_queue_.front(); | 89 MessageInTransit* message = message_queue_.front(); |
90 DCHECK_LT(offset_, message->total_size()); | 90 DCHECK_LT(offset_, message->total_size()); |
91 return message->total_size() - offset_; | 91 return message->total_size() - offset_; |
92 } | 92 } |
93 | 93 |
94 RawChannel::RawChannel() | 94 RawChannel::RawChannel() |
95 : delegate_(NULL), | 95 : message_loop_for_io_(NULL), |
96 message_loop_for_io_(NULL), | 96 delegate_(NULL), |
97 read_stopped_(false), | 97 read_stopped_(false), |
98 write_stopped_(false), | 98 write_stopped_(false), |
99 weak_ptr_factory_(this) { | 99 weak_ptr_factory_(this) { |
100 } | 100 } |
101 | 101 |
102 RawChannel::~RawChannel() { | 102 RawChannel::~RawChannel() { |
103 DCHECK(!read_buffer_); | 103 DCHECK(!read_buffer_); |
104 DCHECK(!write_buffer_); | 104 DCHECK(!write_buffer_); |
105 | 105 |
106 // No need to take the |write_lock_| here -- if there are still weak pointers | 106 // No need to take the |write_lock_| here -- if there are still weak pointers |
(...skipping 26 matching lines...) Expand all Loading... |
133 } | 133 } |
134 | 134 |
135 void RawChannel::Shutdown() { | 135 void RawChannel::Shutdown() { |
136 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 136 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
137 | 137 |
138 base::AutoLock locker(write_lock_); | 138 base::AutoLock locker(write_lock_); |
139 | 139 |
140 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) | 140 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) |
141 << "Shutting down RawChannel with write buffer nonempty"; | 141 << "Shutting down RawChannel with write buffer nonempty"; |
142 | 142 |
143 weak_ptr_factory_.InvalidateWeakPtrs(); | 143 // Reset the delegate so that it won't receive further calls. |
144 | 144 delegate_ = NULL; |
145 read_stopped_ = true; | 145 read_stopped_ = true; |
146 write_stopped_ = true; | 146 write_stopped_ = true; |
| 147 weak_ptr_factory_.InvalidateWeakPtrs(); |
147 | 148 |
148 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 149 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
149 } | 150 } |
150 | 151 |
151 // Reminder: This must be thread-safe. | 152 // Reminder: This must be thread-safe. |
152 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 153 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
153 DCHECK(message); | 154 DCHECK(message); |
154 | 155 |
155 // TODO(vtl) | 156 // TODO(vtl) |
156 if (message->has_platform_handles()) { | 157 if (message->has_platform_handles()) { |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
247 while (remaining_bytes > 0 && | 248 while (remaining_bytes > 0 && |
248 MessageInTransit::GetNextMessageSize( | 249 MessageInTransit::GetNextMessageSize( |
249 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, | 250 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, |
250 &message_size) && | 251 &message_size) && |
251 remaining_bytes >= message_size) { | 252 remaining_bytes >= message_size) { |
252 MessageInTransit::View | 253 MessageInTransit::View |
253 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); | 254 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); |
254 DCHECK_EQ(message_view.total_size(), message_size); | 255 DCHECK_EQ(message_view.total_size(), message_size); |
255 | 256 |
256 // Dispatch the message. | 257 // Dispatch the message. |
| 258 DCHECK(delegate_); |
257 delegate_->OnReadMessage(message_view); | 259 delegate_->OnReadMessage(message_view); |
258 if (read_stopped_) { | 260 if (read_stopped_) { |
259 // |Shutdown()| was called in |OnReadMessage()|. | 261 // |Shutdown()| was called in |OnReadMessage()|. |
260 // TODO(vtl): Add test for this case. | 262 // TODO(vtl): Add test for this case. |
261 return; | 263 return; |
262 } | 264 } |
263 did_dispatch_message = true; | 265 did_dispatch_message = true; |
264 | 266 |
265 // Update our state. | 267 // Update our state. |
266 read_buffer_start += message_size; | 268 read_buffer_start += message_size; |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 did_fail = !OnWriteCompletedNoLock(result, bytes_written); | 323 did_fail = !OnWriteCompletedNoLock(result, bytes_written); |
322 } | 324 } |
323 | 325 |
324 if (did_fail) | 326 if (did_fail) |
325 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); | 327 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); |
326 } | 328 } |
327 | 329 |
328 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { | 330 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
329 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 331 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
330 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 332 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
331 delegate_->OnFatalError(fatal_error); | 333 if (delegate_) |
| 334 delegate_->OnFatalError(fatal_error); |
332 } | 335 } |
333 | 336 |
334 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { | 337 bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { |
335 write_lock_.AssertAcquired(); | 338 write_lock_.AssertAcquired(); |
336 | 339 |
337 DCHECK(!write_stopped_); | 340 DCHECK(!write_stopped_); |
338 DCHECK(!write_buffer_->message_queue_.empty()); | 341 DCHECK(!write_buffer_->message_queue_.empty()); |
339 | 342 |
340 if (result) { | 343 if (result) { |
341 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { | 344 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { |
(...skipping 16 matching lines...) Expand all Loading... |
358 } | 361 } |
359 | 362 |
360 write_stopped_ = true; | 363 write_stopped_ = true; |
361 STLDeleteElements(&write_buffer_->message_queue_); | 364 STLDeleteElements(&write_buffer_->message_queue_); |
362 write_buffer_->offset_ = 0; | 365 write_buffer_->offset_ = 0; |
363 return false; | 366 return false; |
364 } | 367 } |
365 | 368 |
366 } // namespace system | 369 } // namespace system |
367 } // namespace mojo | 370 } // namespace mojo |
OLD | NEW |