| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/system/raw_channel.h" | 5 #include "mojo/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 | 10 |
| (...skipping 12 matching lines...) Expand all Loading... |
| 23 | 23 |
| 24 // RawChannel::ReadBuffer ------------------------------------------------------ | 24 // RawChannel::ReadBuffer ------------------------------------------------------ |
| 25 | 25 |
| 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
| 27 } | 27 } |
| 28 | 28 |
| 29 RawChannel::ReadBuffer::~ReadBuffer() { | 29 RawChannel::ReadBuffer::~ReadBuffer() { |
| 30 } | 30 } |
| 31 | 31 |
| 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
| 33 CHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
| 34 *addr = &buffer_[0] + num_valid_bytes_; | 34 *addr = &buffer_[0] + num_valid_bytes_; |
| 35 *size = kReadSize; | 35 *size = kReadSize; |
| 36 } | 36 } |
| 37 | 37 |
| 38 // RawChannel::WriteBuffer ----------------------------------------------------- | 38 // RawChannel::WriteBuffer ----------------------------------------------------- |
| 39 | 39 |
| 40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) | 40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) |
| 41 : serialized_platform_handle_size_(serialized_platform_handle_size), | 41 : serialized_platform_handle_size_(serialized_platform_handle_size), |
| 42 platform_handles_offset_(0), | 42 platform_handles_offset_(0), |
| 43 data_offset_(0) { | 43 data_offset_(0) { |
| 44 } | 44 } |
| 45 | 45 |
| 46 RawChannel::WriteBuffer::~WriteBuffer() { | 46 RawChannel::WriteBuffer::~WriteBuffer() { |
| 47 STLDeleteElements(&message_queue_); | 47 STLDeleteElements(&message_queue_); |
| 48 } | 48 } |
| 49 | 49 |
| 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { |
| 51 if (message_queue_.empty()) | 51 if (message_queue_.empty()) |
| 52 return false; | 52 return false; |
| 53 | 53 |
| 54 const TransportData* transport_data = | 54 const TransportData* transport_data = |
| 55 message_queue_.front()->transport_data(); | 55 message_queue_.front()->transport_data(); |
| 56 if (!transport_data) | 56 if (!transport_data) |
| 57 return false; | 57 return false; |
| 58 | 58 |
| 59 const embedder::PlatformHandleVector* all_platform_handles = | 59 const embedder::PlatformHandleVector* all_platform_handles = |
| 60 transport_data->platform_handles(); | 60 transport_data->platform_handles(); |
| 61 if (!all_platform_handles) { | 61 if (!all_platform_handles) { |
| 62 CHECK_EQ(platform_handles_offset_, 0u); | 62 DCHECK_EQ(platform_handles_offset_, 0u); |
| 63 return false; | 63 return false; |
| 64 } | 64 } |
| 65 if (platform_handles_offset_ >= all_platform_handles->size()) { | 65 if (platform_handles_offset_ >= all_platform_handles->size()) { |
| 66 CHECK_EQ(platform_handles_offset_, all_platform_handles->size()); | 66 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size()); |
| 67 return false; | 67 return false; |
| 68 } | 68 } |
| 69 | 69 |
| 70 return true; | 70 return true; |
| 71 } | 71 } |
| 72 | 72 |
| 73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( | 73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend( |
| 74 size_t* num_platform_handles, | 74 size_t* num_platform_handles, |
| 75 embedder::PlatformHandle** platform_handles, | 75 embedder::PlatformHandle** platform_handles, |
| 76 void** serialization_data) { | 76 void** serialization_data) { |
| 77 CHECK(HavePlatformHandlesToSend()); | 77 DCHECK(HavePlatformHandlesToSend()); |
| 78 | 78 |
| 79 TransportData* transport_data = message_queue_.front()->transport_data(); | 79 TransportData* transport_data = message_queue_.front()->transport_data(); |
| 80 embedder::PlatformHandleVector* all_platform_handles = | 80 embedder::PlatformHandleVector* all_platform_handles = |
| 81 transport_data->platform_handles(); | 81 transport_data->platform_handles(); |
| 82 *num_platform_handles = | 82 *num_platform_handles = |
| 83 all_platform_handles->size() - platform_handles_offset_; | 83 all_platform_handles->size() - platform_handles_offset_; |
| 84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; | 84 *platform_handles = &(*all_platform_handles)[platform_handles_offset_]; |
| 85 size_t serialization_data_offset = | 85 size_t serialization_data_offset = |
| 86 transport_data->platform_handle_table_offset(); | 86 transport_data->platform_handle_table_offset(); |
| 87 CHECK_GT(serialization_data_offset, 0u); | 87 DCHECK_GT(serialization_data_offset, 0u); |
| 88 serialization_data_offset += | 88 serialization_data_offset += |
| 89 platform_handles_offset_ * serialized_platform_handle_size_; | 89 platform_handles_offset_ * serialized_platform_handle_size_; |
| 90 *serialization_data = | 90 *serialization_data = |
| 91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset; | 91 static_cast<char*>(transport_data->buffer()) + serialization_data_offset; |
| 92 } | 92 } |
| 93 | 93 |
| 94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { | 94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { |
| 95 buffers->clear(); | 95 buffers->clear(); |
| 96 | 96 |
| 97 if (message_queue_.empty()) | 97 if (message_queue_.empty()) |
| 98 return; | 98 return; |
| 99 | 99 |
| 100 MessageInTransit* message = message_queue_.front(); | 100 MessageInTransit* message = message_queue_.front(); |
| 101 CHECK_LT(data_offset_, message->total_size()); | 101 DCHECK_LT(data_offset_, message->total_size()); |
| 102 size_t bytes_to_write = message->total_size() - data_offset_; | 102 size_t bytes_to_write = message->total_size() - data_offset_; |
| 103 | 103 |
| 104 size_t transport_data_buffer_size = | 104 size_t transport_data_buffer_size = |
| 105 message->transport_data() ? message->transport_data()->buffer_size() : 0; | 105 message->transport_data() ? message->transport_data()->buffer_size() : 0; |
| 106 | 106 |
| 107 if (!transport_data_buffer_size) { | 107 if (!transport_data_buffer_size) { |
| 108 // Only write from the main buffer. | 108 // Only write from the main buffer. |
| 109 CHECK_LT(data_offset_, message->main_buffer_size()); | 109 DCHECK_LT(data_offset_, message->main_buffer_size()); |
| 110 CHECK_LE(bytes_to_write, message->main_buffer_size()); | 110 DCHECK_LE(bytes_to_write, message->main_buffer_size()); |
| 111 Buffer buffer = { | 111 Buffer buffer = { |
| 112 static_cast<const char*>(message->main_buffer()) + data_offset_, | 112 static_cast<const char*>(message->main_buffer()) + data_offset_, |
| 113 bytes_to_write}; | 113 bytes_to_write}; |
| 114 buffers->push_back(buffer); | 114 buffers->push_back(buffer); |
| 115 return; | 115 return; |
| 116 } | 116 } |
| 117 | 117 |
| 118 if (data_offset_ >= message->main_buffer_size()) { | 118 if (data_offset_ >= message->main_buffer_size()) { |
| 119 // Only write from the transport data buffer. | 119 // Only write from the transport data buffer. |
| 120 CHECK_LT(data_offset_ - message->main_buffer_size(), | 120 DCHECK_LT(data_offset_ - message->main_buffer_size(), |
| 121 transport_data_buffer_size); | 121 transport_data_buffer_size); |
| 122 CHECK_LE(bytes_to_write, transport_data_buffer_size); | 122 DCHECK_LE(bytes_to_write, transport_data_buffer_size); |
| 123 Buffer buffer = { | 123 Buffer buffer = { |
| 124 static_cast<const char*>(message->transport_data()->buffer()) + | 124 static_cast<const char*>(message->transport_data()->buffer()) + |
| 125 (data_offset_ - message->main_buffer_size()), | 125 (data_offset_ - message->main_buffer_size()), |
| 126 bytes_to_write}; | 126 bytes_to_write}; |
| 127 buffers->push_back(buffer); | 127 buffers->push_back(buffer); |
| 128 return; | 128 return; |
| 129 } | 129 } |
| 130 | 130 |
| 131 // TODO(vtl): We could actually send out buffers from multiple messages, with | 131 // TODO(vtl): We could actually send out buffers from multiple messages, with |
| 132 // the "stopping" condition being reaching a message with platform handles | 132 // the "stopping" condition being reaching a message with platform handles |
| 133 // attached. | 133 // attached. |
| 134 | 134 |
| 135 // Write from both buffers. | 135 // Write from both buffers. |
| 136 CHECK_EQ( | 136 DCHECK_EQ( |
| 137 bytes_to_write, | 137 bytes_to_write, |
| 138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); | 138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); |
| 139 Buffer buffer1 = { | 139 Buffer buffer1 = { |
| 140 static_cast<const char*>(message->main_buffer()) + data_offset_, | 140 static_cast<const char*>(message->main_buffer()) + data_offset_, |
| 141 message->main_buffer_size() - data_offset_}; | 141 message->main_buffer_size() - data_offset_}; |
| 142 buffers->push_back(buffer1); | 142 buffers->push_back(buffer1); |
| 143 Buffer buffer2 = { | 143 Buffer buffer2 = { |
| 144 static_cast<const char*>(message->transport_data()->buffer()), | 144 static_cast<const char*>(message->transport_data()->buffer()), |
| 145 transport_data_buffer_size}; | 145 transport_data_buffer_size}; |
| 146 buffers->push_back(buffer2); | 146 buffers->push_back(buffer2); |
| 147 } | 147 } |
| 148 | 148 |
| 149 // RawChannel ------------------------------------------------------------------ | 149 // RawChannel ------------------------------------------------------------------ |
| 150 | 150 |
| 151 RawChannel::RawChannel() | 151 RawChannel::RawChannel() |
| 152 : message_loop_for_io_(NULL), | 152 : message_loop_for_io_(NULL), |
| 153 delegate_(NULL), | 153 delegate_(NULL), |
| 154 read_stopped_(false), | 154 read_stopped_(false), |
| 155 write_stopped_(false), | 155 write_stopped_(false), |
| 156 weak_ptr_factory_(this) { | 156 weak_ptr_factory_(this) { |
| 157 } | 157 } |
| 158 | 158 |
| 159 RawChannel::~RawChannel() { | 159 RawChannel::~RawChannel() { |
| 160 CHECK(!read_buffer_); | 160 DCHECK(!read_buffer_); |
| 161 CHECK(!write_buffer_); | 161 DCHECK(!write_buffer_); |
| 162 | 162 |
| 163 // No need to take the |write_lock_| here -- if there are still weak pointers | 163 // No need to take the |write_lock_| here -- if there are still weak pointers |
| 164 // outstanding, then we're hosed anyway (since we wouldn't be able to | 164 // outstanding, then we're hosed anyway (since we wouldn't be able to |
| 165 // invalidate them cleanly, since we might not be on the I/O thread). | 165 // invalidate them cleanly, since we might not be on the I/O thread). |
| 166 CHECK(!weak_ptr_factory_.HasWeakPtrs()); | 166 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); |
| 167 } | 167 } |
| 168 | 168 |
| 169 bool RawChannel::Init(Delegate* delegate) { | 169 bool RawChannel::Init(Delegate* delegate) { |
| 170 CHECK(delegate); | 170 DCHECK(delegate); |
| 171 | 171 |
| 172 CHECK(!delegate_); | 172 DCHECK(!delegate_); |
| 173 delegate_ = delegate; | 173 delegate_ = delegate; |
| 174 | 174 |
| 175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); | 175 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
| 176 CHECK(!message_loop_for_io_); | 176 DCHECK(!message_loop_for_io_); |
| 177 message_loop_for_io_ = | 177 message_loop_for_io_ = |
| 178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); | 178 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
| 179 | 179 |
| 180 // No need to take the lock. No one should be using us yet. | 180 // No need to take the lock. No one should be using us yet. |
| 181 CHECK(!read_buffer_); | 181 DCHECK(!read_buffer_); |
| 182 read_buffer_.reset(new ReadBuffer); | 182 read_buffer_.reset(new ReadBuffer); |
| 183 CHECK(!write_buffer_); | 183 DCHECK(!write_buffer_); |
| 184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); | 184 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); |
| 185 | 185 |
| 186 if (!OnInit()) { | 186 if (!OnInit()) { |
| 187 delegate_ = NULL; | 187 delegate_ = NULL; |
| 188 message_loop_for_io_ = NULL; | 188 message_loop_for_io_ = NULL; |
| 189 read_buffer_.reset(); | 189 read_buffer_.reset(); |
| 190 write_buffer_.reset(); | 190 write_buffer_.reset(); |
| 191 return false; | 191 return false; |
| 192 } | 192 } |
| 193 | 193 |
| 194 if (ScheduleRead() != IO_PENDING) { | 194 if (ScheduleRead() != IO_PENDING) { |
| 195 // This will notify the delegate about the read failure. Although we're on | 195 // This will notify the delegate about the read failure. Although we're on |
| 196 // the I/O thread, don't call it in the nested context. | 196 // the I/O thread, don't call it in the nested context. |
| 197 message_loop_for_io_->PostTask(FROM_HERE, | 197 message_loop_for_io_->PostTask(FROM_HERE, |
| 198 base::Bind(&RawChannel::OnReadCompleted, | 198 base::Bind(&RawChannel::OnReadCompleted, |
| 199 weak_ptr_factory_.GetWeakPtr(), | 199 weak_ptr_factory_.GetWeakPtr(), |
| 200 false, | 200 false, |
| 201 0)); | 201 0)); |
| 202 } | 202 } |
| 203 | 203 |
| 204 // ScheduleRead() failure is treated as a read failure (by notifying the | 204 // ScheduleRead() failure is treated as a read failure (by notifying the |
| 205 // delegate), not as an init failure. | 205 // delegate), not as an init failure. |
| 206 return true; | 206 return true; |
| 207 } | 207 } |
| 208 | 208 |
| 209 void RawChannel::Shutdown() { | 209 void RawChannel::Shutdown() { |
| 210 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 210 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 211 | 211 |
| 212 base::AutoLock locker(write_lock_); | 212 base::AutoLock locker(write_lock_); |
| 213 | 213 |
| 214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) | 214 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) |
| 215 << "Shutting down RawChannel with write buffer nonempty"; | 215 << "Shutting down RawChannel with write buffer nonempty"; |
| 216 | 216 |
| 217 // Reset the delegate so that it won't receive further calls. | 217 // Reset the delegate so that it won't receive further calls. |
| 218 delegate_ = NULL; | 218 delegate_ = NULL; |
| 219 read_stopped_ = true; | 219 read_stopped_ = true; |
| 220 write_stopped_ = true; | 220 write_stopped_ = true; |
| 221 weak_ptr_factory_.InvalidateWeakPtrs(); | 221 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 222 | 222 |
| 223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 223 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); |
| 224 } | 224 } |
| 225 | 225 |
| 226 // Reminder: This must be thread-safe. | 226 // Reminder: This must be thread-safe. |
| 227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 227 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| 228 CHECK(message); | 228 DCHECK(message); |
| 229 | 229 |
| 230 base::AutoLock locker(write_lock_); | 230 base::AutoLock locker(write_lock_); |
| 231 if (write_stopped_) | 231 if (write_stopped_) |
| 232 return false; | 232 return false; |
| 233 | 233 |
| 234 if (!write_buffer_->message_queue_.empty()) { | 234 if (!write_buffer_->message_queue_.empty()) { |
| 235 EnqueueMessageNoLock(message.Pass()); | 235 EnqueueMessageNoLock(message.Pass()); |
| 236 return true; | 236 return true; |
| 237 } | 237 } |
| 238 | 238 |
| 239 EnqueueMessageNoLock(message.Pass()); | 239 EnqueueMessageNoLock(message.Pass()); |
| 240 CHECK_EQ(write_buffer_->data_offset_, 0u); | 240 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
| 241 | 241 |
| 242 size_t platform_handles_written = 0; | 242 size_t platform_handles_written = 0; |
| 243 size_t bytes_written = 0; | 243 size_t bytes_written = 0; |
| 244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 244 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
| 245 if (io_result == IO_PENDING) | 245 if (io_result == IO_PENDING) |
| 246 return true; | 246 return true; |
| 247 | 247 |
| 248 bool result = OnWriteCompletedNoLock( | 248 bool result = OnWriteCompletedNoLock( |
| 249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written); | 249 io_result == IO_SUCCEEDED, platform_handles_written, bytes_written); |
| 250 if (!result) { | 250 if (!result) { |
| 251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the | 251 // Even if we're on the I/O thread, don't call |OnFatalError()| in the |
| 252 // nested context. | 252 // nested context. |
| 253 message_loop_for_io_->PostTask(FROM_HERE, | 253 message_loop_for_io_->PostTask(FROM_HERE, |
| 254 base::Bind(&RawChannel::CallOnFatalError, | 254 base::Bind(&RawChannel::CallOnFatalError, |
| 255 weak_ptr_factory_.GetWeakPtr(), | 255 weak_ptr_factory_.GetWeakPtr(), |
| 256 Delegate::FATAL_ERROR_WRITE)); | 256 Delegate::FATAL_ERROR_WRITE)); |
| 257 } | 257 } |
| 258 | 258 |
| 259 return result; | 259 return result; |
| 260 } | 260 } |
| 261 | 261 |
| 262 // Reminder: This must be thread-safe. | 262 // Reminder: This must be thread-safe. |
| 263 bool RawChannel::IsWriteBufferEmpty() { | 263 bool RawChannel::IsWriteBufferEmpty() { |
| 264 base::AutoLock locker(write_lock_); | 264 base::AutoLock locker(write_lock_); |
| 265 return write_buffer_->message_queue_.empty(); | 265 return write_buffer_->message_queue_.empty(); |
| 266 } | 266 } |
| 267 | 267 |
| 268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { | 268 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { |
| 269 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 269 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 270 | 270 |
| 271 if (read_stopped_) { | 271 if (read_stopped_) { |
| 272 NOTREACHED(); | 272 NOTREACHED(); |
| 273 return; | 273 return; |
| 274 } | 274 } |
| 275 | 275 |
| 276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; | 276 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; |
| 277 | 277 |
| 278 // Keep reading data in a loop, and dispatch messages if enough data is | 278 // Keep reading data in a loop, and dispatch messages if enough data is |
| 279 // received. Exit the loop if any of the following happens: | 279 // received. Exit the loop if any of the following happens: |
| (...skipping 23 matching lines...) Expand all Loading... |
| 303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | 303 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
| 304 // next read). | 304 // next read). |
| 305 // TODO(vtl): Validate that |message_size| is sane. | 305 // TODO(vtl): Validate that |message_size| is sane. |
| 306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | 306 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
| 307 &read_buffer_->buffer_[read_buffer_start], | 307 &read_buffer_->buffer_[read_buffer_start], |
| 308 remaining_bytes, | 308 remaining_bytes, |
| 309 &message_size) && | 309 &message_size) && |
| 310 remaining_bytes >= message_size) { | 310 remaining_bytes >= message_size) { |
| 311 MessageInTransit::View message_view( | 311 MessageInTransit::View message_view( |
| 312 message_size, &read_buffer_->buffer_[read_buffer_start]); | 312 message_size, &read_buffer_->buffer_[read_buffer_start]); |
| 313 CHECK_EQ(message_view.total_size(), message_size); | 313 DCHECK_EQ(message_view.total_size(), message_size); |
| 314 | 314 |
| 315 const char* error_message = NULL; | 315 const char* error_message = NULL; |
| 316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | 316 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
| 317 &error_message)) { | 317 &error_message)) { |
| 318 CHECK(error_message); | 318 DCHECK(error_message); |
| 319 LOG(WARNING) << "Received invalid message: " << error_message; | 319 LOG(WARNING) << "Received invalid message: " << error_message; |
| 320 read_stopped_ = true; | 320 read_stopped_ = true; |
| 321 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 321 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
| 322 return; | 322 return; |
| 323 } | 323 } |
| 324 | 324 |
| 325 if (message_view.type() == MessageInTransit::kTypeRawChannel) { | 325 if (message_view.type() == MessageInTransit::kTypeRawChannel) { |
| 326 if (!OnReadMessageForRawChannel(message_view)) { | 326 if (!OnReadMessageForRawChannel(message_view)) { |
| 327 read_stopped_ = true; | 327 read_stopped_ = true; |
| 328 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 328 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
| (...skipping 19 matching lines...) Expand all Loading... |
| 348 CallOnFatalError(Delegate::FATAL_ERROR_READ); | 348 CallOnFatalError(Delegate::FATAL_ERROR_READ); |
| 349 return; | 349 return; |
| 350 } | 350 } |
| 351 } | 351 } |
| 352 } | 352 } |
| 353 | 353 |
| 354 // TODO(vtl): In the case that we aren't expecting any platform handles, | 354 // TODO(vtl): In the case that we aren't expecting any platform handles, |
| 355 // for the POSIX implementation, we should confirm that none are stored. | 355 // for the POSIX implementation, we should confirm that none are stored. |
| 356 | 356 |
| 357 // Dispatch the message. | 357 // Dispatch the message. |
| 358 CHECK(delegate_); | 358 DCHECK(delegate_); |
| 359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | 359 delegate_->OnReadMessage(message_view, platform_handles.Pass()); |
| 360 if (read_stopped_) { | 360 if (read_stopped_) { |
| 361 // |Shutdown()| was called in |OnReadMessage()|. | 361 // |Shutdown()| was called in |OnReadMessage()|. |
| 362 // TODO(vtl): Add test for this case. | 362 // TODO(vtl): Add test for this case. |
| 363 return; | 363 return; |
| 364 } | 364 } |
| 365 } | 365 } |
| 366 | 366 |
| 367 did_dispatch_message = true; | 367 did_dispatch_message = true; |
| 368 | 368 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 406 // (2) If we didn't max out |kReadSize|, stop reading for now. | 406 // (2) If we didn't max out |kReadSize|, stop reading for now. |
| 407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; | 407 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; |
| 408 bytes_read = 0; | 408 bytes_read = 0; |
| 409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); | 409 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); |
| 410 } while (io_result != IO_PENDING); | 410 } while (io_result != IO_PENDING); |
| 411 } | 411 } |
| 412 | 412 |
| 413 void RawChannel::OnWriteCompleted(bool result, | 413 void RawChannel::OnWriteCompleted(bool result, |
| 414 size_t platform_handles_written, | 414 size_t platform_handles_written, |
| 415 size_t bytes_written) { | 415 size_t bytes_written) { |
| 416 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 416 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 417 | 417 |
| 418 bool did_fail = false; | 418 bool did_fail = false; |
| 419 { | 419 { |
| 420 base::AutoLock locker(write_lock_); | 420 base::AutoLock locker(write_lock_); |
| 421 CHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); | 421 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
| 422 | 422 |
| 423 if (write_stopped_) { | 423 if (write_stopped_) { |
| 424 NOTREACHED(); | 424 NOTREACHED(); |
| 425 return; | 425 return; |
| 426 } | 426 } |
| 427 | 427 |
| 428 did_fail = !OnWriteCompletedNoLock( | 428 did_fail = !OnWriteCompletedNoLock( |
| 429 result, platform_handles_written, bytes_written); | 429 result, platform_handles_written, bytes_written); |
| 430 } | 430 } |
| 431 | 431 |
| 432 if (did_fail) | 432 if (did_fail) |
| 433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE); | 433 CallOnFatalError(Delegate::FATAL_ERROR_WRITE); |
| 434 } | 434 } |
| 435 | 435 |
| 436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 436 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
| 437 write_lock_.AssertAcquired(); | 437 write_lock_.AssertAcquired(); |
| 438 write_buffer_->message_queue_.push_back(message.release()); | 438 write_buffer_->message_queue_.push_back(message.release()); |
| 439 } | 439 } |
| 440 | 440 |
| 441 bool RawChannel::OnReadMessageForRawChannel( | 441 bool RawChannel::OnReadMessageForRawChannel( |
| 442 const MessageInTransit::View& message_view) { | 442 const MessageInTransit::View& message_view) { |
| 443 // No non-implementation specific |RawChannel| control messages. | 443 // No non-implementation specific |RawChannel| control messages. |
| 444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() | 444 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() |
| 445 << ")"; | 445 << ")"; |
| 446 return false; | 446 return false; |
| 447 } | 447 } |
| 448 | 448 |
| 449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { | 449 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { |
| 450 CHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 450 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 451 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? |
| 452 if (delegate_) | 452 if (delegate_) |
| 453 delegate_->OnFatalError(fatal_error); | 453 delegate_->OnFatalError(fatal_error); |
| 454 } | 454 } |
| 455 | 455 |
| 456 bool RawChannel::OnWriteCompletedNoLock(bool result, | 456 bool RawChannel::OnWriteCompletedNoLock(bool result, |
| 457 size_t platform_handles_written, | 457 size_t platform_handles_written, |
| 458 size_t bytes_written) { | 458 size_t bytes_written) { |
| 459 write_lock_.AssertAcquired(); | 459 write_lock_.AssertAcquired(); |
| 460 | 460 |
| 461 CHECK(!write_stopped_); | 461 DCHECK(!write_stopped_); |
| 462 CHECK(!write_buffer_->message_queue_.empty()); | 462 DCHECK(!write_buffer_->message_queue_.empty()); |
| 463 | 463 |
| 464 if (result) { | 464 if (result) { |
| 465 write_buffer_->platform_handles_offset_ += platform_handles_written; | 465 write_buffer_->platform_handles_offset_ += platform_handles_written; |
| 466 write_buffer_->data_offset_ += bytes_written; | 466 write_buffer_->data_offset_ += bytes_written; |
| 467 | 467 |
| 468 MessageInTransit* message = write_buffer_->message_queue_.front(); | 468 MessageInTransit* message = write_buffer_->message_queue_.front(); |
| 469 if (write_buffer_->data_offset_ >= message->total_size()) { | 469 if (write_buffer_->data_offset_ >= message->total_size()) { |
| 470 // Complete write. | 470 // Complete write. |
| 471 CHECK_EQ(write_buffer_->data_offset_, message->total_size()); | 471 DCHECK_EQ(write_buffer_->data_offset_, message->total_size()); |
| 472 write_buffer_->message_queue_.pop_front(); | 472 write_buffer_->message_queue_.pop_front(); |
| 473 delete message; | 473 delete message; |
| 474 write_buffer_->platform_handles_offset_ = 0; | 474 write_buffer_->platform_handles_offset_ = 0; |
| 475 write_buffer_->data_offset_ = 0; | 475 write_buffer_->data_offset_ = 0; |
| 476 | 476 |
| 477 if (write_buffer_->message_queue_.empty()) | 477 if (write_buffer_->message_queue_.empty()) |
| 478 return true; | 478 return true; |
| 479 } | 479 } |
| 480 | 480 |
| 481 // Schedule the next write. | 481 // Schedule the next write. |
| 482 IOResult io_result = ScheduleWriteNoLock(); | 482 IOResult io_result = ScheduleWriteNoLock(); |
| 483 if (io_result == IO_PENDING) | 483 if (io_result == IO_PENDING) |
| 484 return true; | 484 return true; |
| 485 CHECK_EQ(io_result, IO_FAILED); | 485 DCHECK_EQ(io_result, IO_FAILED); |
| 486 } | 486 } |
| 487 | 487 |
| 488 write_stopped_ = true; | 488 write_stopped_ = true; |
| 489 STLDeleteElements(&write_buffer_->message_queue_); | 489 STLDeleteElements(&write_buffer_->message_queue_); |
| 490 write_buffer_->platform_handles_offset_ = 0; | 490 write_buffer_->platform_handles_offset_ = 0; |
| 491 write_buffer_->data_offset_ = 0; | 491 write_buffer_->data_offset_ = 0; |
| 492 return false; | 492 return false; |
| 493 } | 493 } |
| 494 | 494 |
| 495 } // namespace system | 495 } // namespace system |
| 496 } // namespace mojo | 496 } // namespace mojo |
| OLD | NEW |