| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <errno.h> | 7 #include <errno.h> |
| 8 #include <string.h> | 8 #include <string.h> |
| 9 #include <unistd.h> | 9 #include <unistd.h> |
| 10 | 10 |
| (...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); |
| 252 | 252 |
| 253 // Dispatch all the messages that we can. | 253 // Dispatch all the messages that we can. |
| 254 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { | 254 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { |
| 255 const MessageInTransit* message = | 255 const MessageInTransit* message = |
| 256 reinterpret_cast<const MessageInTransit*>( | 256 reinterpret_cast<const MessageInTransit*>( |
| 257 &read_buffer_[read_buffer_start]); | 257 &read_buffer_[read_buffer_start]); |
| 258 DCHECK_EQ(reinterpret_cast<size_t>(message) % | 258 DCHECK_EQ(reinterpret_cast<size_t>(message) % |
| 259 MessageInTransit::kMessageAlignment, 0u); | 259 MessageInTransit::kMessageAlignment, 0u); |
| 260 // If we have the header, not the whole message.... | 260 // If we have the header, not the whole message.... |
| 261 if (read_buffer_num_valid_bytes_ < | 261 if (read_buffer_num_valid_bytes_ < message->main_buffer_size()) |
| 262 message->size_with_header_and_padding()) | |
| 263 break; | 262 break; |
| 264 | 263 |
| 265 // Dispatch the message. | 264 // Dispatch the message. |
| 266 delegate()->OnReadMessage(*message); | 265 delegate()->OnReadMessage(*message); |
| 267 if (!read_watcher_.get()) { | 266 if (!read_watcher_.get()) { |
| 268 // |Shutdown()| was called in |OnReadMessage()|. | 267 // |Shutdown()| was called in |OnReadMessage()|. |
| 269 // TODO(vtl): Add test for this case. | 268 // TODO(vtl): Add test for this case. |
| 270 return; | 269 return; |
| 271 } | 270 } |
| 272 did_dispatch_message = true; | 271 did_dispatch_message = true; |
| 273 | 272 |
| 274 // Update our state. | 273 // Update our state. |
| 275 read_buffer_start += message->size_with_header_and_padding(); | 274 read_buffer_start += message->main_buffer_size(); |
| 276 read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); | 275 read_buffer_num_valid_bytes_ -= message->main_buffer_size(); |
| 277 } | 276 } |
| 278 | 277 |
| 279 // If we dispatched any messages, stop reading for now (and let the message | 278 // If we dispatched any messages, stop reading for now (and let the message |
| 280 // loop do its thing for another round). | 279 // loop do its thing for another round). |
| 281 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | 280 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
| 282 // a single message. Risks: slower, more complex if we want to avoid lots of | 281 // a single message. Risks: slower, more complex if we want to avoid lots of |
| 283 // copying. ii. Keep reading until there's no more data and dispatch all the | 282 // copying. ii. Keep reading until there's no more data and dispatch all the |
| 284 // messages we can. Risks: starvation of other users of the message loop.) | 283 // messages we can. Risks: starvation of other users of the message loop.) |
| 285 if (did_dispatch_message) | 284 if (did_dispatch_message) |
| 286 break; | 285 break; |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 346 delegate()->OnFatalError(fatal_error); | 345 delegate()->OnFatalError(fatal_error); |
| 347 } | 346 } |
| 348 | 347 |
| 349 bool RawChannelPosix::WriteFrontMessageNoLock() { | 348 bool RawChannelPosix::WriteFrontMessageNoLock() { |
| 350 write_lock_.AssertAcquired(); | 349 write_lock_.AssertAcquired(); |
| 351 | 350 |
| 352 DCHECK(!write_stopped_); | 351 DCHECK(!write_stopped_); |
| 353 DCHECK(!write_message_queue_.empty()); | 352 DCHECK(!write_message_queue_.empty()); |
| 354 | 353 |
| 355 MessageInTransit* message = write_message_queue_.front(); | 354 MessageInTransit* message = write_message_queue_.front(); |
| 356 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); | 355 DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
| 357 size_t bytes_to_write = | 356 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
| 358 message->size_with_header_and_padding() - write_message_offset_; | |
| 359 ssize_t bytes_written = HANDLE_EINTR( | 357 ssize_t bytes_written = HANDLE_EINTR( |
| 360 write(fd_.get().fd, | 358 write(fd_.get().fd, |
| 361 reinterpret_cast<char*>(message) + write_message_offset_, | 359 static_cast<const char*>(message->main_buffer()) + |
| 360 write_message_offset_, |
| 362 bytes_to_write)); | 361 bytes_to_write)); |
| 363 if (bytes_written < 0) { | 362 if (bytes_written < 0) { |
| 364 if (errno != EAGAIN && errno != EWOULDBLOCK) { | 363 if (errno != EAGAIN && errno != EWOULDBLOCK) { |
| 365 PLOG(ERROR) << "write of size " << bytes_to_write; | 364 PLOG(ERROR) << "write of size " << bytes_to_write; |
| 366 CancelPendingWritesNoLock(); | 365 CancelPendingWritesNoLock(); |
| 367 return false; | 366 return false; |
| 368 } | 367 } |
| 369 | 368 |
| 370 // We simply failed to write since we'd block. The logic is the same as if | 369 // We simply failed to write since we'd block. The logic is the same as if |
| 371 // we got a partial write. | 370 // we got a partial write. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 407 // Static factory method declared in raw_channel.h. | 406 // Static factory method declared in raw_channel.h. |
| 408 // static | 407 // static |
| 409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 408 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
| 410 Delegate* delegate, | 409 Delegate* delegate, |
| 411 base::MessageLoop* message_loop) { | 410 base::MessageLoop* message_loop) { |
| 412 return new RawChannelPosix(handle.Pass(), delegate, message_loop); | 411 return new RawChannelPosix(handle.Pass(), delegate, message_loop); |
| 413 } | 412 } |
| 414 | 413 |
| 415 } // namespace system | 414 } // namespace system |
| 416 } // namespace mojo | 415 } // namespace mojo |
| OLD | NEW |