Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(446)

Side by Side Diff: mojo/edk/system/raw_channel.cc

Issue 859333004: Allow mojo::system::RawChannel::Delegate methods to destroy the RawChannel. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <string.h> 7 #include <string.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 10
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 static_cast<const char*>(message->transport_data()->buffer()), 149 static_cast<const char*>(message->transport_data()->buffer()),
150 transport_data_buffer_size}; 150 transport_data_buffer_size};
151 buffers->push_back(buffer2); 151 buffers->push_back(buffer2);
152 } 152 }
153 153
154 // RawChannel ------------------------------------------------------------------ 154 // RawChannel ------------------------------------------------------------------
155 155
156 RawChannel::RawChannel() 156 RawChannel::RawChannel()
157 : message_loop_for_io_(nullptr), 157 : message_loop_for_io_(nullptr),
158 delegate_(nullptr), 158 delegate_(nullptr),
159 read_stopped_(false), 159 set_on_shutdown_(nullptr),
160 write_stopped_(false), 160 write_stopped_(false),
161 weak_ptr_factory_(this) { 161 weak_ptr_factory_(this) {
162 } 162 }
163 163
164 RawChannel::~RawChannel() { 164 RawChannel::~RawChannel() {
165 DCHECK(!read_buffer_); 165 DCHECK(!read_buffer_);
166 DCHECK(!write_buffer_); 166 DCHECK(!write_buffer_);
167 167
168 // No need to take the |write_lock_| here -- if there are still weak pointers 168 // No need to take the |write_lock_| here -- if there are still weak pointers
169 // outstanding, then we're hosed anyway (since we wouldn't be able to 169 // outstanding, then we're hosed anyway (since we wouldn't be able to
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
205 void RawChannel::Shutdown() { 205 void RawChannel::Shutdown() {
206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
207 207
208 base::AutoLock locker(write_lock_); 208 base::AutoLock locker(write_lock_);
209 209
210 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) 210 LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
211 << "Shutting down RawChannel with write buffer nonempty"; 211 << "Shutting down RawChannel with write buffer nonempty";
212 212
213 // Reset the delegate so that it won't receive further calls. 213 // Reset the delegate so that it won't receive further calls.
214 delegate_ = nullptr; 214 delegate_ = nullptr;
215 read_stopped_ = true; 215 if (set_on_shutdown_) {
216 *set_on_shutdown_ = true;
217 set_on_shutdown_ = nullptr;
218 }
216 write_stopped_ = true; 219 write_stopped_ = true;
217 weak_ptr_factory_.InvalidateWeakPtrs(); 220 weak_ptr_factory_.InvalidateWeakPtrs();
218 221
219 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 222 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
220 } 223 }
221 224
222 // Reminder: This must be thread-safe. 225 // Reminder: This must be thread-safe.
223 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 226 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
224 DCHECK(message); 227 DCHECK(message);
225 228
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
257 260
258 // Reminder: This must be thread-safe. 261 // Reminder: This must be thread-safe.
259 bool RawChannel::IsWriteBufferEmpty() { 262 bool RawChannel::IsWriteBufferEmpty() {
260 base::AutoLock locker(write_lock_); 263 base::AutoLock locker(write_lock_);
261 return write_buffer_->message_queue_.empty(); 264 return write_buffer_->message_queue_.empty();
262 } 265 }
263 266
264 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { 267 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
265 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 268 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
266 269
267 if (read_stopped_) {
yzshen1 2015/01/21 23:03:42 Without this NOTREACHED(), maybe we should add mor
viettrungluu 2015/01/21 23:34:31 Done.
268 NOTREACHED();
269 return;
270 }
271
272 // Keep reading data in a loop, and dispatch messages if enough data is 270 // Keep reading data in a loop, and dispatch messages if enough data is
273 // received. Exit the loop if any of the following happens: 271 // received. Exit the loop if any of the following happens:
274 // - one or more messages were dispatched; 272 // - one or more messages were dispatched;
275 // - the last read failed, was a partial read or would block; 273 // - the last read failed, was a partial read or would block;
276 // - |Shutdown()| was called. 274 // - |Shutdown()| was called.
277 do { 275 do {
278 switch (io_result) { 276 switch (io_result) {
279 case IO_SUCCEEDED: 277 case IO_SUCCEEDED:
280 break; 278 break;
281 case IO_FAILED_SHUTDOWN: 279 case IO_FAILED_SHUTDOWN:
282 case IO_FAILED_BROKEN: 280 case IO_FAILED_BROKEN:
283 case IO_FAILED_UNKNOWN: 281 case IO_FAILED_UNKNOWN:
284 read_stopped_ = true;
285 CallOnError(ReadIOResultToError(io_result)); 282 CallOnError(ReadIOResultToError(io_result));
286 return; 283 return; // |this| may have been destroyed in |CallOnError()|.
287 case IO_PENDING: 284 case IO_PENDING:
288 NOTREACHED(); 285 NOTREACHED();
289 return; 286 return;
290 } 287 }
291 288
292 read_buffer_->num_valid_bytes_ += bytes_read; 289 read_buffer_->num_valid_bytes_ += bytes_read;
293 290
294 // Dispatch all the messages that we can. 291 // Dispatch all the messages that we can.
295 bool did_dispatch_message = false; 292 bool did_dispatch_message = false;
296 // Tracks the offset of the first undispatched message in |read_buffer_|. 293 // Tracks the offset of the first undispatched message in |read_buffer_|.
(...skipping 14 matching lines...) Expand all
311 remaining_bytes >= message_size) { 308 remaining_bytes >= message_size) {
312 MessageInTransit::View message_view( 309 MessageInTransit::View message_view(
313 message_size, &read_buffer_->buffer_[read_buffer_start]); 310 message_size, &read_buffer_->buffer_[read_buffer_start]);
314 DCHECK_EQ(message_view.total_size(), message_size); 311 DCHECK_EQ(message_view.total_size(), message_size);
315 312
316 const char* error_message = nullptr; 313 const char* error_message = nullptr;
317 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), 314 if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
318 &error_message)) { 315 &error_message)) {
319 DCHECK(error_message); 316 DCHECK(error_message);
320 LOG(ERROR) << "Received invalid message: " << error_message; 317 LOG(ERROR) << "Received invalid message: " << error_message;
321 read_stopped_ = true;
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); 318 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
323 return; 319 return; // |this| may have been destroyed in |CallOnError()|.
324 } 320 }
325 321
326 if (message_view.type() == MessageInTransit::kTypeRawChannel) { 322 if (message_view.type() == MessageInTransit::kTypeRawChannel) {
327 if (!OnReadMessageForRawChannel(message_view)) { 323 if (!OnReadMessageForRawChannel(message_view)) {
328 read_stopped_ = true;
329 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); 324 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
330 return; 325 return; // |this| may have been destroyed in |CallOnError()|.
331 } 326 }
332 } else { 327 } else {
333 embedder::ScopedPlatformHandleVectorPtr platform_handles; 328 embedder::ScopedPlatformHandleVectorPtr platform_handles;
334 if (message_view.transport_data_buffer()) { 329 if (message_view.transport_data_buffer()) {
335 size_t num_platform_handles; 330 size_t num_platform_handles;
336 const void* platform_handle_table; 331 const void* platform_handle_table;
337 TransportData::GetPlatformHandleTable( 332 TransportData::GetPlatformHandleTable(
338 message_view.transport_data_buffer(), &num_platform_handles, 333 message_view.transport_data_buffer(), &num_platform_handles,
339 &platform_handle_table); 334 &platform_handle_table);
340 335
341 if (num_platform_handles > 0) { 336 if (num_platform_handles > 0) {
342 platform_handles = 337 platform_handles =
343 GetReadPlatformHandles(num_platform_handles, 338 GetReadPlatformHandles(num_platform_handles,
344 platform_handle_table).Pass(); 339 platform_handle_table).Pass();
345 if (!platform_handles) { 340 if (!platform_handles) {
346 LOG(ERROR) << "Invalid number of platform handles received"; 341 LOG(ERROR) << "Invalid number of platform handles received";
347 read_stopped_ = true;
348 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); 342 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
349 return; 343 return; // |this| may have been destroyed in |CallOnError()|.
350 } 344 }
351 } 345 }
352 } 346 }
353 347
354 // TODO(vtl): In the case that we aren't expecting any platform handles, 348 // TODO(vtl): In the case that we aren't expecting any platform handles,
355 // for the POSIX implementation, we should confirm that none are stored. 349 // for the POSIX implementation, we should confirm that none are stored.
356 350
357 // Dispatch the message. 351 // Dispatch the message.
352 // Detect the case when |Shutdown()| is called; subsequent destruction
353 // is also permitted then.
354 bool shutdown_called = false;
355 DCHECK(!set_on_shutdown_);
356 set_on_shutdown_ = &shutdown_called;
358 DCHECK(delegate_); 357 DCHECK(delegate_);
359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); 358 delegate_->OnReadMessage(message_view, platform_handles.Pass());
360 if (read_stopped_) { 359 if (shutdown_called)
361 // |Shutdown()| was called in |OnReadMessage()|.
362 // TODO(vtl): Add test for this case.
363 return; 360 return;
364 } 361 set_on_shutdown_ = nullptr;
365 } 362 }
366 363
367 did_dispatch_message = true; 364 did_dispatch_message = true;
368 365
369 // Update our state. 366 // Update our state.
370 read_buffer_start += message_size; 367 read_buffer_start += message_size;
371 remaining_bytes -= message_size; 368 remaining_bytes -= message_size;
372 } 369 }
373 370
374 if (read_buffer_start > 0) { 371 if (read_buffer_start > 0) {
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
422 419
423 if (write_stopped_) { 420 if (write_stopped_) {
424 NOTREACHED(); 421 NOTREACHED();
425 return; 422 return;
426 } 423 }
427 424
428 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, 425 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written,
429 bytes_written); 426 bytes_written);
430 } 427 }
431 428
432 if (did_fail) 429 if (did_fail) {
433 CallOnError(Delegate::ERROR_WRITE); 430 CallOnError(Delegate::ERROR_WRITE);
431 return; // |this| may have been destroyed in |CallOnError()|.
432 }
434 } 433 }
435 434
436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { 435 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
437 write_lock_.AssertAcquired(); 436 write_lock_.AssertAcquired();
438 write_buffer_->message_queue_.push_back(message.release()); 437 write_buffer_->message_queue_.push_back(message.release());
439 } 438 }
440 439
441 bool RawChannel::OnReadMessageForRawChannel( 440 bool RawChannel::OnReadMessageForRawChannel(
442 const MessageInTransit::View& message_view) { 441 const MessageInTransit::View& message_view) {
443 // No non-implementation specific |RawChannel| control messages. 442 // No non-implementation specific |RawChannel| control messages.
(...skipping 16 matching lines...) Expand all
460 case IO_PENDING: 459 case IO_PENDING:
461 NOTREACHED(); 460 NOTREACHED();
462 break; 461 break;
463 } 462 }
464 return Delegate::ERROR_READ_UNKNOWN; 463 return Delegate::ERROR_READ_UNKNOWN;
465 } 464 }
466 465
467 void RawChannel::CallOnError(Delegate::Error error) { 466 void RawChannel::CallOnError(Delegate::Error error) {
468 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 467 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
469 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 468 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
470 if (delegate_) 469 if (delegate_) {
471 delegate_->OnError(error); 470 delegate_->OnError(error);
471 return; // |this| may have been destroyed in |OnError()|.
472 }
472 } 473 }
473 474
474 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, 475 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
475 size_t platform_handles_written, 476 size_t platform_handles_written,
476 size_t bytes_written) { 477 size_t bytes_written) {
477 write_lock_.AssertAcquired(); 478 write_lock_.AssertAcquired();
478 479
479 DCHECK(!write_stopped_); 480 DCHECK(!write_stopped_);
480 DCHECK(!write_buffer_->message_queue_.empty()); 481 DCHECK(!write_buffer_->message_queue_.empty());
481 482
(...skipping 23 matching lines...) Expand all
505 506
506 write_stopped_ = true; 507 write_stopped_ = true;
507 STLDeleteElements(&write_buffer_->message_queue_); 508 STLDeleteElements(&write_buffer_->message_queue_);
508 write_buffer_->platform_handles_offset_ = 0; 509 write_buffer_->platform_handles_offset_ = 0;
509 write_buffer_->data_offset_ = 0; 510 write_buffer_->data_offset_ = 0;
510 return false; 511 return false;
511 } 512 }
512 513
513 } // namespace system 514 } // namespace system
514 } // namespace mojo 515 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698