Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 1337953004: base::Lock -> Mutex in RawChannel. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
156 delegate_(nullptr), 156 delegate_(nullptr),
157 set_on_shutdown_(nullptr), 157 set_on_shutdown_(nullptr),
158 write_stopped_(false), 158 write_stopped_(false),
159 weak_ptr_factory_(this) { 159 weak_ptr_factory_(this) {
160 } 160 }
161 161
162 RawChannel::~RawChannel() { 162 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_); 163 DCHECK(!read_buffer_);
164 DCHECK(!write_buffer_); 164 DCHECK(!write_buffer_);
165 165
166 // No need to take the |write_lock_| here -- if there are still weak pointers 166 // No need to take |write_mutex_| here -- if there are still weak pointers
167 // outstanding, then we're hosed anyway (since we wouldn't be able to 167 // outstanding, then we're hosed anyway (since we wouldn't be able to
168 // invalidate them cleanly, since we might not be on the I/O thread). 168 // invalidate them cleanly, since we might not be on the I/O thread).
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 169 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
170 } 170 }
171 171
172 void RawChannel::Init(Delegate* delegate) { 172 void RawChannel::Init(Delegate* delegate) {
173 DCHECK(delegate); 173 DCHECK(delegate);
174 174
175 DCHECK(!delegate_); 175 DCHECK(!delegate_);
176 delegate_ = delegate; 176 delegate_ = delegate;
(...skipping 19 matching lines...) Expand all
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, 196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted,
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); 197 weak_ptr_factory_.GetWeakPtr(), io_result, 0));
198 } 198 }
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying 199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying
200 // the delegate), not an initialization failure. 200 // the delegate), not an initialization failure.
201 } 201 }
202 202
203 void RawChannel::Shutdown() { 203 void RawChannel::Shutdown() {
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
205 205
206 base::AutoLock locker(write_lock_); 206 MutexLocker locker(&write_mutex_);
207 207
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) 208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty())
209 << "Shutting down RawChannel with write buffer nonempty"; 209 << "Shutting down RawChannel with write buffer nonempty";
210 210
211 // Reset the delegate so that it won't receive further calls. 211 // Reset the delegate so that it won't receive further calls.
212 delegate_ = nullptr; 212 delegate_ = nullptr;
213 if (set_on_shutdown_) { 213 if (set_on_shutdown_) {
214 *set_on_shutdown_ = true; 214 *set_on_shutdown_ = true;
215 set_on_shutdown_ = nullptr; 215 set_on_shutdown_ = nullptr;
216 } 216 }
217 write_stopped_ = true; 217 write_stopped_ = true;
218 weak_ptr_factory_.InvalidateWeakPtrs(); 218 weak_ptr_factory_.InvalidateWeakPtrs();
219 219
220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
221 } 221 }
222 222
223 // Reminder: This must be thread-safe. 223 // Reminder: This must be thread-safe.
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
225 DCHECK(message); 225 DCHECK(message);
226 226
227 base::AutoLock locker(write_lock_); 227 MutexLocker locker(&write_mutex_);
228 if (write_stopped_) 228 if (write_stopped_)
229 return false; 229 return false;
230 230
231 if (!write_buffer_->message_queue_.IsEmpty()) { 231 if (!write_buffer_->message_queue_.IsEmpty()) {
232 EnqueueMessageNoLock(message.Pass()); 232 EnqueueMessageNoLock(message.Pass());
233 return true; 233 return true;
234 } 234 }
235 235
236 EnqueueMessageNoLock(message.Pass()); 236 EnqueueMessageNoLock(message.Pass());
237 DCHECK_EQ(write_buffer_->data_offset_, 0u); 237 DCHECK_EQ(write_buffer_->data_offset_, 0u);
(...skipping 13 matching lines...) Expand all
251 FROM_HERE, 251 FROM_HERE,
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), 252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(),
253 Delegate::ERROR_WRITE)); 253 Delegate::ERROR_WRITE));
254 } 254 }
255 255
256 return result; 256 return result;
257 } 257 }
258 258
259 // Reminder: This must be thread-safe. 259 // Reminder: This must be thread-safe.
260 bool RawChannel::IsWriteBufferEmpty() { 260 bool RawChannel::IsWriteBufferEmpty() {
261 base::AutoLock locker(write_lock_); 261 MutexLocker locker(&write_mutex_);
262 return write_buffer_->message_queue_.IsEmpty(); 262 return write_buffer_->message_queue_.IsEmpty();
263 } 263 }
264 264
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
267 267
268 // Keep reading data in a loop, and dispatch messages if enough data is 268 // Keep reading data in a loop, and dispatch messages if enough data is
269 // received. Exit the loop if any of the following happens: 269 // received. Exit the loop if any of the following happens:
270 // - one or more messages were dispatched; 270 // - one or more messages were dispatched;
271 // - the last read failed, was a partial read or would block; 271 // - the last read failed, was a partial read or would block;
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
405 } 405 }
406 406
407 void RawChannel::OnWriteCompleted(IOResult io_result, 407 void RawChannel::OnWriteCompleted(IOResult io_result,
408 size_t platform_handles_written, 408 size_t platform_handles_written,
409 size_t bytes_written) { 409 size_t bytes_written) {
410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 410 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
411 DCHECK_NE(io_result, IO_PENDING); 411 DCHECK_NE(io_result, IO_PENDING);
412 412
413 bool did_fail = false; 413 bool did_fail = false;
414 { 414 {
415 base::AutoLock locker(write_lock_); 415 MutexLocker locker(&write_mutex_);
416 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty()); 416 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty());
417 417
418 if (write_stopped_) { 418 if (write_stopped_) {
419 NOTREACHED(); 419 NOTREACHED();
420 return; 420 return;
421 } 421 }
422 422
423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, 423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
424 bytes_written); 424 bytes_written);
425 } 425 }
426 426
427 if (did_fail) { 427 if (did_fail) {
428 CallOnError(Delegate::ERROR_WRITE); 428 CallOnError(Delegate::ERROR_WRITE);
429 return; // |this| may have been destroyed in |CallOnError()|. 429 return; // |this| may have been destroyed in |CallOnError()|.
430 } 430 }
431 } 431 }
432 432
433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
434 write_lock_.AssertAcquired(); 434 write_mutex_.AssertHeld();
435 write_buffer_->message_queue_.AddMessage(message.Pass()); 435 write_buffer_->message_queue_.AddMessage(message.Pass());
436 } 436 }
437 437
438 bool RawChannel::OnReadMessageForRawChannel( 438 bool RawChannel::OnReadMessageForRawChannel(
439 const MessageInTransit::View& message_view) { 439 const MessageInTransit::View& message_view) {
440 // No non-implementation specific |RawChannel| control messages. 440 // No non-implementation specific |RawChannel| control messages.
441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() 441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
442 << ")"; 442 << ")";
443 return false; 443 return false;
444 } 444 }
(...skipping 11 matching lines...) Expand all
456 case IO_SUCCEEDED: 456 case IO_SUCCEEDED:
457 case IO_PENDING: 457 case IO_PENDING:
458 NOTREACHED(); 458 NOTREACHED();
459 break; 459 break;
460 } 460 }
461 return Delegate::ERROR_READ_UNKNOWN; 461 return Delegate::ERROR_READ_UNKNOWN;
462 } 462 }
463 463
464 void RawChannel::CallOnError(Delegate::Error error) { 464 void RawChannel::CallOnError(Delegate::Error error) {
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 466 // TODO(vtl): Add a "write_mutex_.AssertNotHeld()"?
467 if (delegate_) { 467 if (delegate_) {
468 delegate_->OnError(error); 468 delegate_->OnError(error);
469 return; // |this| may have been destroyed in |OnError()|. 469 return; // |this| may have been destroyed in |OnError()|.
470 } 470 }
471 } 471 }
472 472
473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
474 size_t platform_handles_written, 474 size_t platform_handles_written,
475 size_t bytes_written) { 475 size_t bytes_written) {
476 write_lock_.AssertAcquired(); 476 write_mutex_.AssertHeld();
477 477
478 DCHECK(!write_stopped_); 478 DCHECK(!write_stopped_);
479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); 479 DCHECK(!write_buffer_->message_queue_.IsEmpty());
480 480
481 if (io_result == IO_SUCCEEDED) { 481 if (io_result == IO_SUCCEEDED) {
482 write_buffer_->platform_handles_offset_ += platform_handles_written; 482 write_buffer_->platform_handles_offset_ += platform_handles_written;
483 write_buffer_->data_offset_ += bytes_written; 483 write_buffer_->data_offset_ += bytes_written;
484 484
485 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage(); 485 MessageInTransit* message = write_buffer_->message_queue_.PeekMessage();
486 if (write_buffer_->data_offset_ >= message->total_size()) { 486 if (write_buffer_->data_offset_ >= message->total_size()) {
(...skipping 16 matching lines...) Expand all
503 503
504 write_stopped_ = true; 504 write_stopped_ = true;
505 write_buffer_->message_queue_.Clear(); 505 write_buffer_->message_queue_.Clear();
506 write_buffer_->platform_handles_offset_ = 0; 506 write_buffer_->platform_handles_offset_ = 0;
507 write_buffer_->data_offset_ = 0; 507 write_buffer_->data_offset_ = 0;
508 return false; 508 return false;
509 } 509 }
510 510
511 } // namespace system 511 } // namespace system
512 } // namespace mojo 512 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel.h ('k') | mojo/edk/system/raw_channel_posix.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698