OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <string.h> | 8 #include <string.h> |
9 #include <unistd.h> | 9 #include <unistd.h> |
10 | 10 |
(...skipping 240 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); | 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); |
252 | 252 |
253 // Dispatch all the messages that we can. | 253 // Dispatch all the messages that we can. |
254 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { | 254 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { |
255 const MessageInTransit* message = | 255 const MessageInTransit* message = |
256 reinterpret_cast<const MessageInTransit*>( | 256 reinterpret_cast<const MessageInTransit*>( |
257 &read_buffer_[read_buffer_start]); | 257 &read_buffer_[read_buffer_start]); |
258 DCHECK_EQ(reinterpret_cast<size_t>(message) % | 258 DCHECK_EQ(reinterpret_cast<size_t>(message) % |
259 MessageInTransit::kMessageAlignment, 0u); | 259 MessageInTransit::kMessageAlignment, 0u); |
260 // If we have the header, not the whole message.... | 260 // If we have the header, not the whole message.... |
261 if (read_buffer_num_valid_bytes_ < | 261 if (read_buffer_num_valid_bytes_ < message->main_buffer_size()) |
262 message->size_with_header_and_padding()) | |
263 break; | 262 break; |
264 | 263 |
265 // Dispatch the message. | 264 // Dispatch the message. |
266 delegate()->OnReadMessage(*message); | 265 delegate()->OnReadMessage(*message); |
267 if (!read_watcher_.get()) { | 266 if (!read_watcher_.get()) { |
268 // |Shutdown()| was called in |OnReadMessage()|. | 267 // |Shutdown()| was called in |OnReadMessage()|. |
269 // TODO(vtl): Add test for this case. | 268 // TODO(vtl): Add test for this case. |
270 return; | 269 return; |
271 } | 270 } |
272 did_dispatch_message = true; | 271 did_dispatch_message = true; |
273 | 272 |
274 // Update our state. | 273 // Update our state. |
275 read_buffer_start += message->size_with_header_and_padding(); | 274 read_buffer_start += message->main_buffer_size(); |
276 read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); | 275 read_buffer_num_valid_bytes_ -= message->main_buffer_size(); |
277 } | 276 } |
278 | 277 |
279 // If we dispatched any messages, stop reading for now (and let the message | 278 // If we dispatched any messages, stop reading for now (and let the message |
280 // loop do its thing for another round). | 279 // loop do its thing for another round). |
281 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only | 280 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only |
282 // a single message. Risks: slower, more complex if we want to avoid lots of | 281 // a single message. Risks: slower, more complex if we want to avoid lots of |
283 // copying. ii. Keep reading until there's no more data and dispatch all the | 282 // copying. ii. Keep reading until there's no more data and dispatch all the |
284 // messages we can. Risks: starvation of other users of the message loop.) | 283 // messages we can. Risks: starvation of other users of the message loop.) |
285 if (did_dispatch_message) | 284 if (did_dispatch_message) |
286 break; | 285 break; |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
346 delegate()->OnFatalError(fatal_error); | 345 delegate()->OnFatalError(fatal_error); |
347 } | 346 } |
348 | 347 |
349 bool RawChannelPosix::WriteFrontMessageNoLock() { | 348 bool RawChannelPosix::WriteFrontMessageNoLock() { |
350 write_lock_.AssertAcquired(); | 349 write_lock_.AssertAcquired(); |
351 | 350 |
352 DCHECK(!write_stopped_); | 351 DCHECK(!write_stopped_); |
353 DCHECK(!write_message_queue_.empty()); | 352 DCHECK(!write_message_queue_.empty()); |
354 | 353 |
355 MessageInTransit* message = write_message_queue_.front(); | 354 MessageInTransit* message = write_message_queue_.front(); |
356 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); | 355 DCHECK_LT(write_message_offset_, message->main_buffer_size()); |
357 size_t bytes_to_write = | 356 size_t bytes_to_write = message->main_buffer_size() - write_message_offset_; |
358 message->size_with_header_and_padding() - write_message_offset_; | |
359 ssize_t bytes_written = HANDLE_EINTR( | 357 ssize_t bytes_written = HANDLE_EINTR( |
360 write(fd_.get().fd, | 358 write(fd_.get().fd, |
361 reinterpret_cast<char*>(message) + write_message_offset_, | 359 static_cast<const char*>(message->main_buffer()) + |
| 360 write_message_offset_, |
362 bytes_to_write)); | 361 bytes_to_write)); |
363 if (bytes_written < 0) { | 362 if (bytes_written < 0) { |
364 if (errno != EAGAIN && errno != EWOULDBLOCK) { | 363 if (errno != EAGAIN && errno != EWOULDBLOCK) { |
365 PLOG(ERROR) << "write of size " << bytes_to_write; | 364 PLOG(ERROR) << "write of size " << bytes_to_write; |
366 CancelPendingWritesNoLock(); | 365 CancelPendingWritesNoLock(); |
367 return false; | 366 return false; |
368 } | 367 } |
369 | 368 |
370 // We simply failed to write since we'd block. The logic is the same as if | 369 // We simply failed to write since we'd block. The logic is the same as if |
371 // we got a partial write. | 370 // we got a partial write. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
407 // Static factory method declared in raw_channel.h. | 406 // Static factory method declared in raw_channel.h. |
408 // static | 407 // static |
409 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, | 408 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, |
410 Delegate* delegate, | 409 Delegate* delegate, |
411 base::MessageLoop* message_loop) { | 410 base::MessageLoop* message_loop) { |
412 return new RawChannelPosix(handle.Pass(), delegate, message_loop); | 411 return new RawChannelPosix(handle.Pass(), delegate, message_loop); |
413 } | 412 } |
414 | 413 |
415 } // namespace system | 414 } // namespace system |
416 } // namespace mojo | 415 } // namespace mojo |
OLD | NEW |