Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
| 6 | 6 |
| 7 #include <string.h> | 7 #include <string.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/logging.h" | 13 #include "base/logging.h" |
| 14 #include "base/message_loop/message_loop.h" | 14 #include "base/message_loop/message_loop.h" |
| 15 #include "mojo/edk/embedder/embedder_internal.h" | |
| 15 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
| 16 #include "mojo/edk/system/transport_data.h" | 17 #include "mojo/edk/system/transport_data.h" |
| 17 | 18 |
| 18 namespace mojo { | 19 namespace mojo { |
| 19 namespace system { | 20 namespace system { |
| 20 | 21 |
| 21 const size_t kReadSize = 4096; | 22 const size_t kReadSize = 4096; |
| 22 | 23 |
| 23 // RawChannel::ReadBuffer ------------------------------------------------------ | 24 // RawChannel::ReadBuffer ------------------------------------------------------ |
| 24 | 25 |
| 25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
| 26 } | 27 } |
| 27 | 28 |
| 28 RawChannel::ReadBuffer::~ReadBuffer() { | 29 RawChannel::ReadBuffer::~ReadBuffer() { |
| 29 } | 30 } |
| 30 | 31 |
| 31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
| 32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
| 33 *addr = &buffer_[0] + num_valid_bytes_; | 34 *addr = &buffer_[0] + num_valid_bytes_; |
| 34 *size = kReadSize; | 35 *size = kReadSize; |
| 35 } | 36 } |
| 36 | 37 |
| 37 // RawChannel::WriteBuffer ----------------------------------------------------- | 38 // RawChannel::WriteBuffer ----------------------------------------------------- |
| 38 | 39 |
| 39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) | 40 RawChannel::WriteBuffer::WriteBuffer() |
| 40 : serialized_platform_handle_size_(serialized_platform_handle_size), | 41 : serialized_platform_handle_size_(0), |
| 41 platform_handles_offset_(0), | 42 platform_handles_offset_(0), |
| 42 data_offset_(0) { | 43 data_offset_(0) { |
| 43 } | 44 } |
| 44 | 45 |
| 45 RawChannel::WriteBuffer::~WriteBuffer() { | 46 RawChannel::WriteBuffer::~WriteBuffer() { |
| 46 message_queue_.Clear(); | 47 message_queue_.Clear(); |
| 47 } | 48 } |
| 48 | 49 |
| 49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { |
| 50 if (message_queue_.IsEmpty()) | 51 if (message_queue_.IsEmpty()) |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 108 size_t transport_data_buffer_size = | 109 size_t transport_data_buffer_size = |
| 109 message->transport_data() ? message->transport_data()->buffer_size() : 0; | 110 message->transport_data() ? message->transport_data()->buffer_size() : 0; |
| 110 | 111 |
| 111 if (!transport_data_buffer_size) { | 112 if (!transport_data_buffer_size) { |
| 112 // Only write from the main buffer. | 113 // Only write from the main buffer. |
| 113 DCHECK_LT(data_offset_, message->main_buffer_size()); | 114 DCHECK_LT(data_offset_, message->main_buffer_size()); |
| 114 DCHECK_LE(bytes_to_write, message->main_buffer_size()); | 115 DCHECK_LE(bytes_to_write, message->main_buffer_size()); |
| 115 Buffer buffer = { | 116 Buffer buffer = { |
| 116 static_cast<const char*>(message->main_buffer()) + data_offset_, | 117 static_cast<const char*>(message->main_buffer()) + data_offset_, |
| 117 bytes_to_write}; | 118 bytes_to_write}; |
| 119 | |
| 118 buffers->push_back(buffer); | 120 buffers->push_back(buffer); |
| 119 return; | 121 return; |
| 120 } | 122 } |
| 121 | 123 |
| 122 if (data_offset_ >= message->main_buffer_size()) { | 124 if (data_offset_ >= message->main_buffer_size()) { |
| 123 // Only write from the transport data buffer. | 125 // Only write from the transport data buffer. |
| 124 DCHECK_LT(data_offset_ - message->main_buffer_size(), | 126 DCHECK_LT(data_offset_ - message->main_buffer_size(), |
| 125 transport_data_buffer_size); | 127 transport_data_buffer_size); |
| 126 DCHECK_LE(bytes_to_write, transport_data_buffer_size); | 128 DCHECK_LE(bytes_to_write, transport_data_buffer_size); |
| 127 Buffer buffer = { | 129 Buffer buffer = { |
| 128 static_cast<const char*>(message->transport_data()->buffer()) + | 130 static_cast<const char*>(message->transport_data()->buffer()) + |
| 129 (data_offset_ - message->main_buffer_size()), | 131 (data_offset_ - message->main_buffer_size()), |
| 130 bytes_to_write}; | 132 bytes_to_write}; |
| 133 | |
| 131 buffers->push_back(buffer); | 134 buffers->push_back(buffer); |
| 132 return; | 135 return; |
| 133 } | 136 } |
| 134 | 137 |
| 135 // TODO(vtl): We could actually send out buffers from multiple messages, with | 138 // TODO(vtl): We could actually send out buffers from multiple messages, with |
| 136 // the "stopping" condition being reaching a message with platform handles | 139 // the "stopping" condition being reaching a message with platform handles |
| 137 // attached. | 140 // attached. |
| 138 | 141 |
| 139 // Write from both buffers. | 142 // Write from both buffers. |
| 140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + | 143 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + |
| 141 transport_data_buffer_size); | 144 transport_data_buffer_size); |
| 142 Buffer buffer1 = { | 145 Buffer buffer1 = { |
| 143 static_cast<const char*>(message->main_buffer()) + data_offset_, | 146 static_cast<const char*>(message->main_buffer()) + data_offset_, |
| 144 message->main_buffer_size() - data_offset_}; | 147 message->main_buffer_size() - data_offset_}; |
| 145 buffers->push_back(buffer1); | 148 buffers->push_back(buffer1); |
| 146 Buffer buffer2 = { | 149 Buffer buffer2 = { |
| 147 static_cast<const char*>(message->transport_data()->buffer()), | 150 static_cast<const char*>(message->transport_data()->buffer()), |
| 148 transport_data_buffer_size}; | 151 transport_data_buffer_size}; |
| 149 buffers->push_back(buffer2); | 152 buffers->push_back(buffer2); |
| 150 } | 153 } |
| 151 | 154 |
| 152 // RawChannel ------------------------------------------------------------------ | 155 // RawChannel ------------------------------------------------------------------ |
| 153 | 156 |
| 154 RawChannel::RawChannel() | 157 RawChannel::RawChannel() |
| 155 : message_loop_for_io_(nullptr), | 158 : message_loop_for_io_(nullptr), |
| 159 set_on_shutdown_(nullptr), | |
| 156 delegate_(nullptr), | 160 delegate_(nullptr), |
| 157 set_on_shutdown_(nullptr), | 161 write_ready_(false), |
| 158 write_stopped_(false), | 162 write_stopped_(false), |
| 163 debug_started_sending_(false), | |
| 164 error_occurred_(false), | |
| 159 weak_ptr_factory_(this) { | 165 weak_ptr_factory_(this) { |
| 166 read_buffer_.reset(new ReadBuffer); | |
| 167 write_buffer_.reset(new WriteBuffer()); | |
| 160 } | 168 } |
| 161 | 169 |
| 162 RawChannel::~RawChannel() { | 170 RawChannel::~RawChannel() { |
| 163 DCHECK(!read_buffer_); | 171 DCHECK(!read_buffer_); |
| 164 DCHECK(!write_buffer_); | 172 DCHECK(!write_buffer_); |
| 165 | 173 |
| 166 // No need to take the |write_lock_| here -- if there are still weak pointers | 174 // Only want to decrement counter if Init was called. |
| 167 // outstanding, then we're hosed anyway (since we wouldn't be able to | 175 if (message_loop_for_io_) { |
| 168 // invalidate them cleanly, since we might not be on the I/O thread). | 176 // No need to take the |write_lock_| here -- if there are still weak |
| 169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 177 // pointers outstanding, then we're hosed anyway (since we wouldn't be able |
| 178 // to invalidate them cleanly, since we might not be on the I/O thread). | |
| 179 // DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | |
| 180 embedder::internal::ChannelShutdown(); | |
| 181 } | |
| 170 } | 182 } |
| 171 | 183 |
| 172 void RawChannel::Init(Delegate* delegate) { | 184 void RawChannel::Init(Delegate* delegate) { |
| 185 embedder::internal::ChannelStarted(); | |
| 173 DCHECK(delegate); | 186 DCHECK(delegate); |
| 174 | 187 |
| 188 base::AutoLock read_locker(read_lock_); | |
| 189 // solves race where initialiing on io thread while main thread is serializing | |
| 190 // this channel and releases handle. | |
| 191 base::AutoLock locker(write_lock_); | |
| 192 | |
| 175 DCHECK(!delegate_); | 193 DCHECK(!delegate_); |
| 176 delegate_ = delegate; | 194 delegate_ = delegate; |
| 177 | 195 |
| 178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); | 196 //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
| 179 DCHECK(!message_loop_for_io_); | 197 DCHECK(!message_loop_for_io_); |
| 180 message_loop_for_io_ = | 198 message_loop_for_io_ = |
| 181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); | 199 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
| 182 | 200 |
| 183 // No need to take the lock. No one should be using us yet. | 201 OnInit(); |
| 184 DCHECK(!read_buffer_); | |
| 185 read_buffer_.reset(new ReadBuffer); | |
| 186 DCHECK(!write_buffer_); | |
| 187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); | |
| 188 | 202 |
| 189 OnInit(); | 203 // Although this means that we can call back sync into the caller, that's |
| 204 // easier than posting a task to do this, because there might also be pending | |
| 205 // read calls and we can't modify the buffer. | |
|
yzshen1
2015/09/23 22:47:08
I haven't understood this part yet: before Schedul
| |
| 206 if (read_buffer_->num_valid_bytes()) { | |
| 207 // We had serialized read buffer data through SetInitialReadBufferData call. | |
| 208 // Make sure we read messages out of it now, otherwise the delegate won't | |
| 209 // get notified if no other data gets written to the pipe. | |
| 210 bool did_dispatch_message = false; | |
| 211 bool stop_dispatching = false; | |
| 212 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
| 213 } | |
| 190 | 214 |
| 191 IOResult io_result = ScheduleRead(); | 215 IOResult io_result = ScheduleRead(); |
| 192 if (io_result != IO_PENDING) { | 216 if (io_result != IO_PENDING) { |
| 193 // This will notify the delegate about the read failure. Although we're on | 217 // This will notify the delegate about the read failure. Although we're on |
| 194 // the I/O thread, don't call it in the nested context. | 218 // the I/O thread, don't call it in the nested context. |
| 195 message_loop_for_io_->PostTask( | 219 message_loop_for_io_->PostTask( |
| 196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, | 220 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, |
| 197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | 221 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); |
| 198 } | 222 } |
| 199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | 223 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
| 200 // the delegate), not an initialization failure. | 224 // the delegate), not an initialization failure. |
| 225 | |
| 226 write_ready_ = true; | |
| 227 write_buffer_->serialized_platform_handle_size_ = | |
| 228 GetSerializedPlatformHandleSize(); | |
| 229 if (!write_buffer_->message_queue_.IsEmpty()) | |
| 230 SendQueuedMessagesNoLock(); | |
| 201 } | 231 } |
| 202 | 232 |
| 203 void RawChannel::Shutdown() { | 233 void RawChannel::Shutdown() { |
| 204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 234 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 205 | 235 |
| 236 // Normally, we want to flush any pending writes before shutting down. This | |
| 237 // doesn't apply when 1) we don't have a handle (for obvious reasons) or | |
| 238 // 2) when the other side already quit and asked us to close the handle to | |
| 239 // ensure that we read everything out of the pipe first. | |
| 240 if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) { | |
| 241 { | |
| 242 base::AutoLock read_locker(read_lock_); | |
| 243 base::AutoLock locker(write_lock_); | |
| 244 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | |
| 245 } | |
| 246 delete this; | |
| 247 return; | |
| 248 } | |
| 249 | |
| 250 base::AutoLock read_locker(read_lock_); | |
| 206 base::AutoLock locker(write_lock_); | 251 base::AutoLock locker(write_lock_); |
| 252 DCHECK(read_buffer_->num_valid_bytes() == 0) << | |
| 253 "RawChannel::Shutdown called but there is pending data to be read"; | |
| 207 | 254 |
| 208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) | 255 // happens on shutdown if didn't call init when doing createduplicate |
| 209 << "Shutting down RawChannel with write buffer nonempty"; | 256 if (message_loop_for_io()) { |
| 257 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
| 258 } | |
| 210 | 259 |
| 211 // Reset the delegate so that it won't receive further calls. | 260 // Reset the delegate so that it won't receive further calls. |
| 212 delegate_ = nullptr; | 261 delegate_ = nullptr; |
| 213 if (set_on_shutdown_) { | 262 if (set_on_shutdown_) { |
| 214 *set_on_shutdown_ = true; | 263 *set_on_shutdown_ = true; |
| 215 set_on_shutdown_ = nullptr; | 264 set_on_shutdown_ = nullptr; |
| 216 } | 265 } |
| 217 write_stopped_ = true; | |
| 218 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 219 | 266 |
| 220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 267 // TODO(jam): probably remove this since it doesn't make sense now that we |
| 268 // wait and flush pending messages. | |
| 269 // write_stopped_ = true; | |
| 270 | |
| 271 | |
| 272 bool empty = write_buffer_->message_queue_.IsEmpty(); | |
| 273 | |
| 274 // We may have no messages to write. However just because our end of the pipe | |
| 275 // wrote everything doesn't mean that the other end read it. We don't want to | |
| 276 // call FlushFileBuffers since a) that only works for server end of the pipe, | |
| 277 // and b) it pauses this thread (which can block a process on another, or | |
| 278 // worse hang if both pipes are in the same process). | |
| 279 scoped_ptr<MessageInTransit> quit_message(new MessageInTransit( | |
| 280 MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr)); | |
| 281 EnqueueMessageNoLock(quit_message.Pass()); | |
| 282 | |
| 283 if (empty) | |
| 284 SendQueuedMessagesNoLock(); | |
| 285 } | |
| 286 | |
| 287 embedder::ScopedPlatformHandle RawChannel::ReleaseHandle( | |
| 288 std::vector<char>* read_buffer) { | |
| 289 //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this; | |
| 290 | |
| 291 embedder::ScopedPlatformHandle rv; | |
| 292 { | |
| 293 base::AutoLock read_locker(read_lock_); | |
| 294 base::AutoLock locker(write_lock_); | |
| 295 rv = ReleaseHandleNoLock(read_buffer); | |
| 296 | |
| 297 // TODO(jam); if we use these, use nolock versions of these methods that are | |
| 298 // copied. | |
| 299 if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) { | |
| 300 NOTREACHED() << "TODO(JAM)"; | |
| 301 } | |
| 302 | |
| 303 delegate_ = nullptr; | |
| 304 | |
| 305 // The Unretained is safe because above cancelled IO so we shouldn't get any | |
| 306 // channel errors. | |
| 307 // |message_loop_for_io_| might not be set yet | |
| 308 embedder::internal::g_io_thread_task_runner->PostTask( | |
| 309 FROM_HERE, | |
| 310 base::Bind(&RawChannel::Shutdown, base::Unretained(this))); | |
| 311 } | |
| 312 | |
| 313 return rv; | |
| 221 } | 314 } |
| 222 | 315 |
| 223 // Reminder: This must be thread-safe. | 316 // Reminder: This must be thread-safe. |
| 224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 317 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
| 225 DCHECK(message); | 318 DCHECK(message); |
| 226 | |
| 227 base::AutoLock locker(write_lock_); | 319 base::AutoLock locker(write_lock_); |
| 320 DCHECK(!debug_started_sending_); | |
| 228 if (write_stopped_) | 321 if (write_stopped_) |
| 229 return false; | 322 return false; |
| 230 | 323 |
| 231 if (!write_buffer_->message_queue_.IsEmpty()) { | 324 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); |
| 232 EnqueueMessageNoLock(message.Pass()); | 325 EnqueueMessageNoLock(message.Pass()); |
| 233 return true; | 326 if (queue_was_empty && write_ready_) |
| 234 } | 327 SendQueuedMessagesNoLock(); |
| 235 | 328 |
| 236 EnqueueMessageNoLock(message.Pass()); | 329 return true; |
| 330 } | |
| 331 | |
| 332 void RawChannel::SendQueuedMessagesNoLock() { | |
| 237 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 333 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
| 238 | 334 |
| 239 size_t platform_handles_written = 0; | 335 size_t platform_handles_written = 0; |
| 240 size_t bytes_written = 0; | 336 size_t bytes_written = 0; |
| 241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 337 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
| 242 if (io_result == IO_PENDING) | 338 if (io_result == IO_PENDING) |
| 243 return true; | 339 return; |
| 244 | 340 |
| 245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, | 341 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, |
| 246 bytes_written); | 342 bytes_written); |
| 247 if (!result) { | 343 if (!result) { |
| 248 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 344 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
| 249 // context. | 345 // context. |
| 250 message_loop_for_io_->PostTask( | 346 message_loop_for_io_->PostTask( |
| 251 FROM_HERE, | 347 FROM_HERE, |
| 252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), | 348 base::Bind(&RawChannel::LockAndCallOnError, |
| 349 weak_ptr_factory_.GetWeakPtr(), | |
| 253 Delegate::ERROR_WRITE)); | 350 Delegate::ERROR_WRITE)); |
| 254 } | 351 } |
| 255 | |
| 256 return result; | |
| 257 } | 352 } |
| 258 | 353 |
| 259 // Reminder: This must be thread-safe. | 354 // Reminder: This must be thread-safe. |
| 260 bool RawChannel::IsWriteBufferEmpty() { | 355 bool RawChannel::IsWriteBufferEmpty() { |
| 261 base::AutoLock locker(write_lock_); | 356 base::AutoLock locker(write_lock_); |
| 262 return write_buffer_->message_queue_.IsEmpty(); | 357 return write_buffer_->message_queue_.IsEmpty(); |
| 263 } | 358 } |
| 264 | 359 |
| 360 bool RawChannel::IsReadBufferEmpty() { | |
| 361 base::AutoLock locker(read_lock_); | |
| 362 return read_buffer_->num_valid_bytes_ != 0; | |
| 363 } | |
| 364 | |
| 365 void RawChannel::SetInitialReadBufferData(char* data, size_t size) { | |
| 366 base::AutoLock locker(read_lock_); | |
| 367 // TODO(jam): copy power of 2 algorithm below? or share. | |
| 368 read_buffer_->buffer_.resize(size+kReadSize); | |
| 369 memcpy(&read_buffer_->buffer_[0], data, size); | |
| 370 read_buffer_->num_valid_bytes_ = size; | |
| 371 } | |
| 372 | |
| 265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 373 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
| 266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 374 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 267 | 375 |
| 376 base::AutoLock locker(read_lock_); | |
| 377 | |
| 268 // Keep reading data in a loop, and dispatch messages if enough data is | 378 // Keep reading data in a loop, and dispatch messages if enough data is |
| 269 // received. Exit the loop if any of the following happens: | 379 // received. Exit the loop if any of the following happens: |
| 270 // - one or more messages were dispatched; | 380 // - one or more messages were dispatched; |
| 271 // - the last read failed, was a partial read or would block; | 381 // - the last read failed, was a partial read or would block; |
| 272 // - |Shutdown()| was called. | 382 // - |Shutdown()| was called. |
| 273 do { | 383 do { |
| 274 switch (io_result) { | 384 switch (io_result) { |
| 275 case IO_SUCCEEDED: | 385 case IO_SUCCEEDED: |
| 276 break; | 386 break; |
| 277 case IO_FAILED_SHUTDOWN: | 387 case IO_FAILED_SHUTDOWN: |
| 278 case IO_FAILED_BROKEN: | 388 case IO_FAILED_BROKEN: |
| 279 case IO_FAILED_UNKNOWN: | 389 case IO_FAILED_UNKNOWN: |
| 280 CallOnError(ReadIOResultToError(io_result)); | 390 CallOnError(ReadIOResultToError(io_result)); |
| 281 return; // |this| may have been destroyed in |CallOnError()|. | 391 return; // |this| may have been destroyed in |CallOnError()|. |
| 282 case IO_PENDING: | 392 case IO_PENDING: |
| 283 NOTREACHED(); | 393 NOTREACHED(); |
| 284 return; | 394 return; |
| 285 } | 395 } |
| 286 | 396 |
| 287 read_buffer_->num_valid_bytes_ += bytes_read; | 397 read_buffer_->num_valid_bytes_ += bytes_read; |
| 288 | 398 |
| 289 // Dispatch all the messages that we can. | 399 // Dispatch all the messages that we can. |
| 290 bool did_dispatch_message = false; | 400 bool did_dispatch_message = false; |
| 291 // Tracks the offset of the first undispatched message in |read_buffer_|. | 401 bool stop_dispatching = false; |
| 292 // Currently, we copy data to ensure that this is zero at the beginning. | 402 DispatchMessages(&did_dispatch_message, &stop_dispatching); |
| 293 size_t read_buffer_start = 0; | 403 if (stop_dispatching) |
| 294 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | 404 return; |
| 295 size_t message_size; | |
| 296 // Note that we rely on short-circuit evaluation here: | |
| 297 // - |read_buffer_start| may be an invalid index into | |
| 298 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
| 299 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 301 // next read). | |
| 302 // TODO(vtl): Validate that |message_size| is sane. | |
| 303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
| 304 &read_buffer_->buffer_[read_buffer_start], | |
| 305 remaining_bytes, &message_size) && | |
| 306 remaining_bytes >= message_size) { | |
| 307 MessageInTransit::View message_view( | |
| 308 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
| 309 DCHECK_EQ(message_view.total_size(), message_size); | |
| 310 | |
| 311 const char* error_message = nullptr; | |
| 312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
| 313 &error_message)) { | |
| 314 DCHECK(error_message); | |
| 315 LOG(ERROR) << "Received invalid message: " << error_message; | |
| 316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 317 return; // |this| may have been destroyed in |CallOnError()|. | |
| 318 } | |
| 319 | |
| 320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) { | |
| 321 if (!OnReadMessageForRawChannel(message_view)) { | |
| 322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 323 return; // |this| may have been destroyed in |CallOnError()|. | |
| 324 } | |
| 325 } else { | |
| 326 embedder::ScopedPlatformHandleVectorPtr platform_handles; | |
| 327 if (message_view.transport_data_buffer()) { | |
| 328 size_t num_platform_handles; | |
| 329 const void* platform_handle_table; | |
| 330 TransportData::GetPlatformHandleTable( | |
| 331 message_view.transport_data_buffer(), &num_platform_handles, | |
| 332 &platform_handle_table); | |
| 333 | |
| 334 if (num_platform_handles > 0) { | |
| 335 platform_handles = | |
| 336 GetReadPlatformHandles(num_platform_handles, | |
| 337 platform_handle_table).Pass(); | |
| 338 if (!platform_handles) { | |
| 339 LOG(ERROR) << "Invalid number of platform handles received"; | |
| 340 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 341 return; // |this| may have been destroyed in |CallOnError()|. | |
| 342 } | |
| 343 } | |
| 344 } | |
| 345 | |
| 346 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
| 347 // for the POSIX implementation, we should confirm that none are stored. | |
| 348 | |
| 349 // Dispatch the message. | |
| 350 // Detect the case when |Shutdown()| is called; subsequent destruction | |
| 351 // is also permitted then. | |
| 352 bool shutdown_called = false; | |
| 353 DCHECK(!set_on_shutdown_); | |
| 354 set_on_shutdown_ = &shutdown_called; | |
| 355 DCHECK(delegate_); | |
| 356 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | |
| 357 if (shutdown_called) | |
| 358 return; | |
| 359 set_on_shutdown_ = nullptr; | |
| 360 } | |
| 361 | |
| 362 did_dispatch_message = true; | |
| 363 | |
| 364 // Update our state. | |
| 365 read_buffer_start += message_size; | |
| 366 remaining_bytes -= message_size; | |
| 367 } | |
| 368 | |
| 369 if (read_buffer_start > 0) { | |
| 370 // Move data back to start. | |
| 371 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
| 372 if (read_buffer_->num_valid_bytes_ > 0) { | |
| 373 memmove(&read_buffer_->buffer_[0], | |
| 374 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
| 375 } | |
| 376 read_buffer_start = 0; | |
| 377 } | |
| 378 | 405 |
| 379 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | 406 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
| 380 kReadSize) { | 407 kReadSize) { |
| 381 // Use power-of-2 buffer sizes. | 408 // Use power-of-2 buffer sizes. |
| 382 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | 409 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
| 383 // maximum message size to whatever extent necessary). | 410 // maximum message size to whatever extent necessary). |
| 384 // TODO(vtl): We may often be able to peek at the header and get the real | 411 // TODO(vtl): We may often be able to peek at the header and get the real |
| 385 // required extra space (which may be much bigger than |kReadSize|). | 412 // required extra space (which may be much bigger than |kReadSize|). |
| 386 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); | 413 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); |
| 387 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) | 414 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) |
| (...skipping 30 matching lines...) Expand all Loading... | |
| 418 if (write_stopped_) { | 445 if (write_stopped_) { |
| 419 NOTREACHED(); | 446 NOTREACHED(); |
| 420 return; | 447 return; |
| 421 } | 448 } |
| 422 | 449 |
| 423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | 450 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, |
| 424 bytes_written); | 451 bytes_written); |
| 425 } | 452 } |
| 426 | 453 |
| 427 if (did_fail) { | 454 if (did_fail) { |
| 455 base::AutoLock locker(read_lock_); | |
| 428 CallOnError(Delegate::ERROR_WRITE); | 456 CallOnError(Delegate::ERROR_WRITE); |
| 429 return; // |this| may have been destroyed in |CallOnError()|. | 457 return; // |this| may have been destroyed in |CallOnError()|. |
| 430 } | 458 } |
| 431 } | 459 } |
| 432 | 460 |
| 433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 461 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
| 434 write_lock_.AssertAcquired(); | 462 write_lock_.AssertAcquired(); |
| 463 DCHECK(HandleForDebuggingNoLock().is_valid()); | |
| 435 write_buffer_->message_queue_.AddMessage(message.Pass()); | 464 write_buffer_->message_queue_.AddMessage(message.Pass()); |
| 436 } | 465 } |
| 437 | 466 |
| 438 bool RawChannel::OnReadMessageForRawChannel( | 467 bool RawChannel::OnReadMessageForRawChannel( |
| 439 const MessageInTransit::View& message_view) { | 468 const MessageInTransit::View& message_view) { |
| 469 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { | |
| 470 base::MessageLoop::current()->PostTask( | |
| 471 FROM_HERE, | |
| 472 base::Bind(&RawChannel::HandleQuitMessage, base::Unretained(this))); | |
| 473 return true; | |
| 474 } | |
| 475 | |
| 440 // No non-implementation specific |RawChannel| control messages. | 476 // No non-implementation specific |RawChannel| control messages. |
| 441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() | 477 LOG(ERROR) << "Invalid control message (type " << message_view.type() |
| 442 << ")"; | 478 << ")"; |
| 443 return false; | 479 return false; |
| 444 } | 480 } |
| 445 | 481 |
| 446 // static | |
| 447 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( | 482 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
| 448 IOResult io_result) { | 483 IOResult io_result) { |
| 449 switch (io_result) { | 484 switch (io_result) { |
| 450 case IO_FAILED_SHUTDOWN: | 485 case IO_FAILED_SHUTDOWN: |
| 451 return Delegate::ERROR_READ_SHUTDOWN; | 486 return Delegate::ERROR_READ_SHUTDOWN; |
| 452 case IO_FAILED_BROKEN: | 487 case IO_FAILED_BROKEN: |
| 453 return Delegate::ERROR_READ_BROKEN; | 488 return Delegate::ERROR_READ_BROKEN; |
| 454 case IO_FAILED_UNKNOWN: | 489 case IO_FAILED_UNKNOWN: |
| 455 return Delegate::ERROR_READ_UNKNOWN; | 490 return Delegate::ERROR_READ_UNKNOWN; |
| 456 case IO_SUCCEEDED: | 491 case IO_SUCCEEDED: |
| 457 case IO_PENDING: | 492 case IO_PENDING: |
| 458 NOTREACHED(); | 493 NOTREACHED(); |
| 459 break; | 494 break; |
| 460 } | 495 } |
| 461 return Delegate::ERROR_READ_UNKNOWN; | 496 return Delegate::ERROR_READ_UNKNOWN; |
| 462 } | 497 } |
| 463 | 498 |
| 464 void RawChannel::CallOnError(Delegate::Error error) { | 499 void RawChannel::CallOnError(Delegate::Error error) { |
| 465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 500 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
| 466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 501 read_lock_.AssertAcquired(); |
| 502 error_occurred_ = true; | |
| 467 if (delegate_) { | 503 if (delegate_) { |
| 468 delegate_->OnError(error); | 504 delegate_->OnError(error); |
| 469 return; // |this| may have been destroyed in |OnError()|. | 505 } else { |
| 506 // We depend on delegate to delete since it could be waiting to call | |
| 507 // ReleaseHandle. | |
| 508 base::MessageLoop::current()->PostTask( | |
| 509 FROM_HERE, | |
| 510 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
| 470 } | 511 } |
| 471 } | 512 } |
| 472 | 513 |
| 514 void RawChannel::LockAndCallOnError(Delegate::Error error) { | |
| 515 base::AutoLock locker(read_lock_); | |
| 516 CallOnError(error); | |
| 517 } | |
| 518 | |
| 473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 519 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
| 474 size_t platform_handles_written, | 520 size_t platform_handles_written, |
| 475 size_t bytes_written) { | 521 size_t bytes_written) { |
| 476 write_lock_.AssertAcquired(); | 522 write_lock_.AssertAcquired(); |
| 477 | 523 |
| 478 DCHECK(!write_stopped_); | 524 DCHECK(!write_stopped_); |
| 479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | 525 DCHECK(!write_buffer_->message_queue_.IsEmpty()); |
| 480 | 526 |
| 481 if (io_result == IO_SUCCEEDED) { | 527 if (io_result == IO_SUCCEEDED) { |
| 482 write_buffer_->platform_handles_offset_ += platform_handles_written; | 528 write_buffer_->platform_handles_offset_ += platform_handles_written; |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 501 DCHECK_NE(io_result, IO_SUCCEEDED); | 547 DCHECK_NE(io_result, IO_SUCCEEDED); |
| 502 } | 548 } |
| 503 | 549 |
| 504 write_stopped_ = true; | 550 write_stopped_ = true; |
| 505 write_buffer_->message_queue_.Clear(); | 551 write_buffer_->message_queue_.Clear(); |
| 506 write_buffer_->platform_handles_offset_ = 0; | 552 write_buffer_->platform_handles_offset_ = 0; |
| 507 write_buffer_->data_offset_ = 0; | 553 write_buffer_->data_offset_ = 0; |
| 508 return false; | 554 return false; |
| 509 } | 555 } |
| 510 | 556 |
| 557 void RawChannel::HandleQuitMessage() { | |
| 558 base::AutoLock locker(read_lock_); | |
| 559 CallOnError(Delegate::ERROR_READ_SHUTDOWN); | |
| 560 } | |
| 561 | |
| 562 void RawChannel::DispatchMessages(bool* did_dispatch_message, | |
| 563 bool* stop_dispatching) { | |
| 564 *did_dispatch_message = false; | |
| 565 *stop_dispatching = false; | |
| 566 // Tracks the offset of the first undispatched message in |read_buffer_|. | |
| 567 // Currently, we copy data to ensure that this is zero at the beginning. | |
| 568 size_t read_buffer_start = 0; | |
| 569 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | |
| 570 size_t message_size; | |
| 571 // Note that we rely on short-circuit evaluation here: | |
| 572 // - |read_buffer_start| may be an invalid index into | |
| 573 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
| 574 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
| 575 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
| 576 // next read). | |
| 577 // TODO(vtl): Validate that |message_size| is sane. | |
| 578 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
| 579 &read_buffer_->buffer_[read_buffer_start], | |
| 580 remaining_bytes, &message_size) && | |
| 581 remaining_bytes >= message_size) { | |
| 582 MessageInTransit::View message_view( | |
| 583 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
| 584 DCHECK_EQ(message_view.total_size(), message_size); | |
| 585 | |
| 586 const char* error_message = nullptr; | |
| 587 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
| 588 &error_message)) { | |
| 589 DCHECK(error_message); | |
| 590 LOG(ERROR) << "Received invalid message: " << error_message; | |
| 591 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 592 *stop_dispatching = true; | |
| 593 return; // |this| may have been destroyed in |CallOnError()|. | |
| 594 } | |
| 595 | |
| 596 if (message_view.type() != MessageInTransit::Type::MESSAGE) { | |
| 597 if (!OnReadMessageForRawChannel(message_view)) { | |
| 598 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 599 *stop_dispatching = true; | |
| 600 return; // |this| may have been destroyed in |CallOnError()|. | |
| 601 } | |
| 602 } else { | |
| 603 embedder::ScopedPlatformHandleVectorPtr platform_handles; | |
| 604 if (message_view.transport_data_buffer()) { | |
| 605 size_t num_platform_handles; | |
| 606 const void* platform_handle_table; | |
| 607 TransportData::GetPlatformHandleTable( | |
| 608 message_view.transport_data_buffer(), &num_platform_handles, | |
| 609 &platform_handle_table); | |
| 610 | |
| 611 if (num_platform_handles > 0) { | |
| 612 platform_handles = | |
| 613 GetReadPlatformHandles(num_platform_handles, | |
| 614 platform_handle_table).Pass(); | |
| 615 if (!platform_handles) { | |
| 616 LOG(ERROR) << "Invalid number of platform handles received"; | |
| 617 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
| 618 *stop_dispatching = true; | |
| 619 return; // |this| may have been destroyed in |CallOnError()|. | |
| 620 } | |
| 621 } | |
| 622 } | |
| 623 | |
| 624 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
| 625 // for the POSIX implementation, we should confirm that none are stored. | |
| 626 | |
| 627 // Dispatch the message. | |
| 628 // Detect the case when |Shutdown()| is called; subsequent destruction | |
| 629 // is also permitted then. | |
| 630 bool shutdown_called = false; | |
| 631 DCHECK(!set_on_shutdown_); | |
| 632 set_on_shutdown_ = &shutdown_called; | |
| 633 // Note: it's valid to get here without a delegate. i.e. after Shutdown | |
| 634 // is called, if this object still has a valid handle we keep it alive | |
| 635 // until the other side closes it in response to the RAW_CHANNEL_QUIT | |
| 636 // message. In the meantime the sender could have sent us a message. | |
| 637 if (delegate_) | |
| 638 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | |
| 639 if (shutdown_called) { | |
| 640 *stop_dispatching = true; | |
| 641 return; | |
| 642 } | |
| 643 set_on_shutdown_ = nullptr; | |
| 644 } | |
| 645 | |
| 646 *did_dispatch_message = true; | |
| 647 | |
| 648 // Update our state. | |
| 649 read_buffer_start += message_size; | |
| 650 remaining_bytes -= message_size; | |
| 651 } | |
| 652 | |
| 653 if (read_buffer_start > 0) { | |
| 654 // Move data back to start. | |
| 655 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
| 656 if (read_buffer_->num_valid_bytes_ > 0) { | |
| 657 memmove(&read_buffer_->buffer_[0], | |
| 658 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
| 659 } | |
| 660 read_buffer_start = 0; | |
| 661 } | |
| 662 } | |
| 663 | |
| 511 } // namespace system | 664 } // namespace system |
| 512 } // namespace mojo | 665 } // namespace mojo |
| OLD | NEW |