| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <algorithm> | 10 #include <algorithm> |
| 11 #include <limits> |
| 11 #include <utility> | 12 #include <utility> |
| 12 | 13 |
| 13 #include "base/bind.h" | 14 #include "base/bind.h" |
| 14 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/memory/ref_counted.h" |
| 15 #include "base/message_loop/message_loop.h" | 17 #include "base/message_loop/message_loop.h" |
| 16 #include "mojo/edk/embedder/embedder_internal.h" | 18 #include "mojo/edk/embedder/embedder_internal.h" |
| 17 #include "mojo/edk/embedder/platform_shared_buffer.h" | 19 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 18 #include "mojo/edk/embedder/platform_support.h" | 20 #include "mojo/edk/embedder/platform_support.h" |
| 19 #include "mojo/edk/system/data_pipe.h" | 21 #include "mojo/edk/system/core.h" |
| 22 #include "mojo/edk/system/data_pipe_control_message.h" |
| 23 #include "mojo/edk/system/node_controller.h" |
| 24 #include "mojo/edk/system/ports_message.h" |
| 25 #include "mojo/public/c/system/data_pipe.h" |
| 20 | 26 |
| 21 namespace mojo { | 27 namespace mojo { |
| 22 namespace edk { | 28 namespace edk { |
| 23 | 29 |
| 24 struct SharedMemoryHeader { | 30 namespace { |
| 25 uint32_t data_size; | 31 |
| 26 uint32_t read_buffer_size; | 32 struct MOJO_ALIGNAS(8) SerializedState { |
| 33 MojoCreateDataPipeOptions options; |
| 34 uint64_t pipe_id; |
| 35 bool peer_closed; |
| 36 uint32_t read_offset; |
| 37 uint32_t bytes_available; |
| 27 }; | 38 }; |
| 28 | 39 |
| 29 void DataPipeConsumerDispatcher::Init( | 40 } // namespace |
| 30 ScopedPlatformHandle message_pipe, | |
| 31 char* serialized_read_buffer, size_t serialized_read_buffer_size) { | |
| 32 if (message_pipe.is_valid()) { | |
| 33 channel_ = RawChannel::Create(std::move(message_pipe)); | |
| 34 channel_->SetSerializedData( | |
| 35 serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u, | |
| 36 nullptr, nullptr); | |
| 37 internal::g_io_thread_task_runner->PostTask( | |
| 38 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); | |
| 39 } else { | |
| 40 // The data pipe consumer could have read all the data and the producer | |
| 41 // closed its end subsequently (before the consumer was sent). In that case | |
| 42 // when we deserialize the consumer we must make sure to set error_ or | |
| 43 // otherwise the peer-closed signal will never be satisfied. | |
| 44 error_ = true; | |
| 45 } | |
| 46 } | |
| 47 | 41 |
| 48 void DataPipeConsumerDispatcher::InitOnIO() { | 42 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a |
| 49 base::AutoLock locker(lock()); | 43 // reference to the dispatcher to ensure it lives as long as the observed port. |
| 50 calling_init_ = true; | 44 class DataPipeConsumerDispatcher::PortObserverThunk |
| 51 if (channel_) | 45 : public NodeController::PortObserver { |
| 52 channel_->Init(this); | 46 public: |
| 53 calling_init_ = false; | 47 explicit PortObserverThunk( |
| 54 } | 48 scoped_refptr<DataPipeConsumerDispatcher> dispatcher) |
| 49 : dispatcher_(dispatcher) {} |
| 55 | 50 |
| 56 void DataPipeConsumerDispatcher::CloseOnIO() { | 51 private: |
| 57 base::AutoLock locker(lock()); | 52 ~PortObserverThunk() override {} |
| 58 if (channel_) { | 53 |
| 59 channel_->Shutdown(); | 54 // NodeController::PortObserver: |
| 60 channel_ = nullptr; | 55 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
| 56 |
| 57 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; |
| 58 |
| 59 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
| 60 }; |
| 61 |
| 62 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
| 63 NodeController* node_controller, |
| 64 const ports::PortRef& control_port, |
| 65 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
| 66 const MojoCreateDataPipeOptions& options, |
| 67 bool initialized, |
| 68 uint64_t pipe_id) |
| 69 : options_(options), |
| 70 node_controller_(node_controller), |
| 71 control_port_(control_port), |
| 72 pipe_id_(pipe_id), |
| 73 shared_ring_buffer_(shared_ring_buffer) { |
| 74 if (initialized) { |
| 75 base::AutoLock lock(lock_); |
| 76 InitializeNoLock(); |
| 61 } | 77 } |
| 62 } | 78 } |
| 63 | 79 |
| 64 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { | 80 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
| 65 return Type::DATA_PIPE_CONSUMER; | 81 return Type::DATA_PIPE_CONSUMER; |
| 66 } | 82 } |
| 67 | 83 |
| 68 scoped_refptr<DataPipeConsumerDispatcher> | 84 MojoResult DataPipeConsumerDispatcher::Close() { |
| 69 DataPipeConsumerDispatcher::Deserialize( | 85 base::AutoLock lock(lock_); |
| 70 const void* source, | 86 DVLOG(1) << "Closing data pipe consumer " << pipe_id_; |
| 71 size_t size, | 87 return CloseNoLock(); |
| 72 PlatformHandleVector* platform_handles) { | |
| 73 MojoCreateDataPipeOptions options; | |
| 74 ScopedPlatformHandle shared_memory_handle; | |
| 75 size_t shared_memory_size = 0; | |
| 76 | |
| 77 ScopedPlatformHandle platform_handle = | |
| 78 DataPipe::Deserialize(source, size, platform_handles, &options, | |
| 79 &shared_memory_handle, &shared_memory_size); | |
| 80 | |
| 81 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); | |
| 82 | |
| 83 char* serialized_read_buffer = nullptr; | |
| 84 size_t serialized_read_buffer_size = 0; | |
| 85 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
| 86 scoped_ptr<PlatformSharedBufferMapping> mapping; | |
| 87 if (shared_memory_size) { | |
| 88 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 89 shared_memory_size, std::move(shared_memory_handle)); | |
| 90 mapping = shared_buffer->Map(0, shared_memory_size); | |
| 91 char* buffer = static_cast<char*>(mapping->GetBase()); | |
| 92 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); | |
| 93 buffer += sizeof(SharedMemoryHeader); | |
| 94 if (header->data_size) { | |
| 95 rv->data_.assign(buffer, buffer + header->data_size); | |
| 96 buffer += header->data_size; | |
| 97 } | |
| 98 | |
| 99 if (header->read_buffer_size) { | |
| 100 serialized_read_buffer = buffer; | |
| 101 serialized_read_buffer_size = header->read_buffer_size; | |
| 102 buffer += header->read_buffer_size; | |
| 103 } | |
| 104 } | |
| 105 | |
| 106 rv->Init(std::move(platform_handle), serialized_read_buffer, | |
| 107 serialized_read_buffer_size); | |
| 108 return rv; | |
| 109 } | 88 } |
| 110 | 89 |
| 111 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( | 90 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| 112 const MojoCreateDataPipeOptions& options) | 91 uint32_t* num_bytes, |
| 113 : options_(options), | 92 MojoReadDataFlags flags) { |
| 114 channel_(nullptr), | 93 base::AutoLock lock(lock_); |
| 115 calling_init_(false), | 94 if (!shared_ring_buffer_ || in_transit_) |
| 116 in_two_phase_read_(false), | 95 return MOJO_RESULT_INVALID_ARGUMENT; |
| 117 two_phase_max_bytes_read_(0), | |
| 118 error_(false), | |
| 119 serialized_(false) { | |
| 120 } | |
| 121 | 96 |
| 122 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { | |
| 123 // See comment in ~MessagePipeDispatcher. | |
| 124 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | |
| 125 channel_->Shutdown(); | |
| 126 else | |
| 127 DCHECK(!channel_); | |
| 128 } | |
| 129 | |
| 130 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { | |
| 131 lock().AssertAcquired(); | |
| 132 awakable_list_.CancelAll(); | |
| 133 } | |
| 134 | |
| 135 void DataPipeConsumerDispatcher::CloseImplNoLock() { | |
| 136 lock().AssertAcquired(); | |
| 137 internal::g_io_thread_task_runner->PostTask( | |
| 138 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); | |
| 139 } | |
| 140 | |
| 141 scoped_refptr<Dispatcher> | |
| 142 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
| 143 lock().AssertAcquired(); | |
| 144 | |
| 145 SerializeInternal(); | |
| 146 | |
| 147 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); | |
| 148 data_.swap(rv->data_); | |
| 149 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
| 150 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
| 151 rv->serialized_ = true; | |
| 152 | |
| 153 return scoped_refptr<Dispatcher>(rv.get()); | |
| 154 } | |
| 155 | |
| 156 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( | |
| 157 void* elements, | |
| 158 uint32_t* num_bytes, | |
| 159 MojoReadDataFlags flags) { | |
| 160 lock().AssertAcquired(); | |
| 161 if (channel_) | |
| 162 channel_->EnsureLazyInitialized(); | |
| 163 if (in_two_phase_read_) | 97 if (in_two_phase_read_) |
| 164 return MOJO_RESULT_BUSY; | 98 return MOJO_RESULT_BUSY; |
| 165 | 99 |
| 166 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { | 100 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
| 167 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || | 101 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
| 168 (flags & MOJO_READ_DATA_FLAG_DISCARD)) | 102 (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
| 169 return MOJO_RESULT_INVALID_ARGUMENT; | 103 return MOJO_RESULT_INVALID_ARGUMENT; |
| 170 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. | 104 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. |
| 171 DVLOG_IF(2, elements) | 105 DVLOG_IF(2, elements) |
| 172 << "Query mode: ignoring non-null |elements|"; | 106 << "Query mode: ignoring non-null |elements|"; |
| 173 *num_bytes = static_cast<uint32_t>(data_.size()); | 107 *num_bytes = static_cast<uint32_t>(bytes_available_); |
| 174 return MOJO_RESULT_OK; | 108 return MOJO_RESULT_OK; |
| 175 } | 109 } |
| 176 | 110 |
| 177 bool discard = false; | 111 bool discard = false; |
| 178 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { | 112 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { |
| 179 // These flags are mutally exclusive. | 113 // These flags are mutally exclusive. |
| 180 if (flags & MOJO_READ_DATA_FLAG_PEEK) | 114 if (flags & MOJO_READ_DATA_FLAG_PEEK) |
| 181 return MOJO_RESULT_INVALID_ARGUMENT; | 115 return MOJO_RESULT_INVALID_ARGUMENT; |
| 182 DVLOG_IF(2, elements) | 116 DVLOG_IF(2, elements) |
| 183 << "Discard mode: ignoring non-null |elements|"; | 117 << "Discard mode: ignoring non-null |elements|"; |
| 184 discard = true; | 118 discard = true; |
| 185 } | 119 } |
| 186 | 120 |
| 187 uint32_t max_num_bytes_to_read = *num_bytes; | 121 uint32_t max_num_bytes_to_read = *num_bytes; |
| 188 if (max_num_bytes_to_read % options_.element_num_bytes != 0) | 122 if (max_num_bytes_to_read % options_.element_num_bytes != 0) |
| 189 return MOJO_RESULT_INVALID_ARGUMENT; | 123 return MOJO_RESULT_INVALID_ARGUMENT; |
| 190 | 124 |
| 191 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; | 125 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
| 192 uint32_t min_num_bytes_to_read = | 126 uint32_t min_num_bytes_to_read = |
| 193 all_or_none ? max_num_bytes_to_read : 0; | 127 all_or_none ? max_num_bytes_to_read : 0; |
| 194 | 128 |
| 195 if (min_num_bytes_to_read > data_.size()) | 129 if (min_num_bytes_to_read > bytes_available_) { |
| 196 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; | 130 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 131 : MOJO_RESULT_OUT_OF_RANGE; |
| 132 } |
| 197 | 133 |
| 198 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, | 134 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
| 199 static_cast<uint32_t>(data_.size())); | 135 if (bytes_to_read == 0) { |
| 200 if (bytes_to_read == 0) | 136 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 201 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; | 137 : MOJO_RESULT_SHOULD_WAIT; |
| 138 } |
| 202 | 139 |
| 203 if (!discard) | 140 if (!discard) { |
| 204 memcpy(elements, &data_[0], bytes_to_read); | 141 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 142 CHECK(data); |
| 143 |
| 144 uint8_t* destination = static_cast<uint8_t*>(elements); |
| 145 CHECK(destination); |
| 146 |
| 147 DCHECK_LE(read_offset_, options_.capacity_num_bytes); |
| 148 uint32_t tail_bytes_to_copy = |
| 149 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); |
| 150 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; |
| 151 if (tail_bytes_to_copy > 0) |
| 152 memcpy(destination, data + read_offset_, tail_bytes_to_copy); |
| 153 if (head_bytes_to_copy > 0) |
| 154 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); |
| 155 } |
| 205 *num_bytes = bytes_to_read; | 156 *num_bytes = bytes_to_read; |
| 206 | 157 |
| 207 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); | 158 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
| 208 if (discard || !peek) | 159 if (discard || !peek) { |
| 209 data_.erase(data_.begin(), data_.begin() + bytes_to_read); | 160 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; |
| 161 bytes_available_ -= bytes_to_read; |
| 162 |
| 163 base::AutoUnlock unlock(lock_); |
| 164 NotifyRead(bytes_to_read); |
| 165 } |
| 210 | 166 |
| 211 return MOJO_RESULT_OK; | 167 return MOJO_RESULT_OK; |
| 212 } | 168 } |
| 213 | 169 |
| 214 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( | 170 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
| 215 const void** buffer, | 171 uint32_t* buffer_num_bytes, |
| 216 uint32_t* buffer_num_bytes, | 172 MojoReadDataFlags flags) { |
| 217 MojoReadDataFlags flags) { | 173 base::AutoLock lock(lock_); |
| 218 lock().AssertAcquired(); | 174 if (!shared_ring_buffer_ || in_transit_) |
| 219 if (channel_) | 175 return MOJO_RESULT_INVALID_ARGUMENT; |
| 220 channel_->EnsureLazyInitialized(); | 176 |
| 221 if (in_two_phase_read_) | 177 if (in_two_phase_read_) |
| 222 return MOJO_RESULT_BUSY; | 178 return MOJO_RESULT_BUSY; |
| 223 | 179 |
| 224 // These flags may not be used in two-phase mode. | 180 // These flags may not be used in two-phase mode. |
| 225 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 181 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
| 226 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 182 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
| 227 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 183 (flags & MOJO_READ_DATA_FLAG_PEEK)) |
| 228 return MOJO_RESULT_INVALID_ARGUMENT; | 184 return MOJO_RESULT_INVALID_ARGUMENT; |
| 229 | 185 |
| 230 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); | 186 if (bytes_available_ == 0) { |
| 231 if (max_num_bytes_to_read == 0) | 187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 232 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; | 188 : MOJO_RESULT_SHOULD_WAIT; |
| 189 } |
| 190 |
| 191 DCHECK_LT(read_offset_, options_.capacity_num_bytes); |
| 192 uint32_t bytes_to_read = std::min(bytes_available_, |
| 193 options_.capacity_num_bytes - read_offset_); |
| 194 |
| 195 CHECK(ring_buffer_mapping_); |
| 196 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 197 CHECK(data); |
| 233 | 198 |
| 234 in_two_phase_read_ = true; | 199 in_two_phase_read_ = true; |
| 235 *buffer = &data_[0]; | 200 *buffer = data + read_offset_; |
| 236 *buffer_num_bytes = max_num_bytes_to_read; | 201 *buffer_num_bytes = bytes_to_read; |
| 237 two_phase_max_bytes_read_ = max_num_bytes_to_read; | 202 two_phase_max_bytes_read_ = bytes_to_read; |
| 238 | 203 |
| 239 return MOJO_RESULT_OK; | 204 return MOJO_RESULT_OK; |
| 240 } | 205 } |
| 241 | 206 |
| 242 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( | 207 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
| 243 uint32_t num_bytes_read) { | 208 base::AutoLock lock(lock_); |
| 244 lock().AssertAcquired(); | |
| 245 if (!in_two_phase_read_) | 209 if (!in_two_phase_read_) |
| 246 return MOJO_RESULT_FAILED_PRECONDITION; | 210 return MOJO_RESULT_FAILED_PRECONDITION; |
| 247 | 211 |
| 248 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 212 if (in_transit_) |
| 213 return MOJO_RESULT_INVALID_ARGUMENT; |
| 214 |
| 215 CHECK(shared_ring_buffer_); |
| 216 |
| 217 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
| 249 MojoResult rv; | 218 MojoResult rv; |
| 250 if (num_bytes_read > two_phase_max_bytes_read_ || | 219 if (num_bytes_read > two_phase_max_bytes_read_ || |
| 251 num_bytes_read % options_.element_num_bytes != 0) { | 220 num_bytes_read % options_.element_num_bytes != 0) { |
| 252 rv = MOJO_RESULT_INVALID_ARGUMENT; | 221 rv = MOJO_RESULT_INVALID_ARGUMENT; |
| 253 } else { | 222 } else { |
| 254 rv = MOJO_RESULT_OK; | 223 rv = MOJO_RESULT_OK; |
| 255 data_.erase(data_.begin(), data_.begin() + num_bytes_read); | 224 read_offset_ = |
| 225 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; |
| 226 |
| 227 DCHECK_GE(bytes_available_, num_bytes_read); |
| 228 bytes_available_ -= num_bytes_read; |
| 229 |
| 230 base::AutoUnlock unlock(lock_); |
| 231 NotifyRead(num_bytes_read); |
| 256 } | 232 } |
| 257 | 233 |
| 258 in_two_phase_read_ = false; | 234 in_two_phase_read_ = false; |
| 259 two_phase_max_bytes_read_ = 0; | 235 two_phase_max_bytes_read_ = 0; |
| 260 if (!data_received_during_two_phase_read_.empty()) { | |
| 261 if (data_.empty()) { | |
| 262 data_received_during_two_phase_read_.swap(data_); | |
| 263 } else { | |
| 264 data_.insert(data_.end(), data_received_during_two_phase_read_.begin(), | |
| 265 data_received_during_two_phase_read_.end()); | |
| 266 data_received_during_two_phase_read_.clear(); | |
| 267 } | |
| 268 } | |
| 269 | 236 |
| 270 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 237 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 271 if (!new_state.equals(old_state)) | 238 if (!new_state.equals(old_state)) |
| 272 awakable_list_.AwakeForStateChange(new_state); | 239 awakable_list_.AwakeForStateChange(new_state); |
| 273 | 240 |
| 274 return rv; | 241 return rv; |
| 275 } | 242 } |
| 276 | 243 |
| 277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() | 244 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
| 278 const { | 245 base::AutoLock lock(lock_); |
| 279 lock().AssertAcquired(); | 246 return GetHandleSignalsStateNoLock(); |
| 280 | |
| 281 HandleSignalsState rv; | |
| 282 if (!data_.empty()) { | |
| 283 if (!in_two_phase_read_) | |
| 284 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
| 285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
| 286 } else if (!error_) { | |
| 287 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | |
| 288 } | |
| 289 | |
| 290 if (error_) | |
| 291 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 292 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 293 return rv; | |
| 294 } | 247 } |
| 295 | 248 |
| 296 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( | 249 MojoResult DataPipeConsumerDispatcher::AddAwakable( |
| 297 Awakable* awakable, | 250 Awakable* awakable, |
| 298 MojoHandleSignals signals, | 251 MojoHandleSignals signals, |
| 299 uintptr_t context, | 252 uintptr_t context, |
| 300 HandleSignalsState* signals_state) { | 253 HandleSignalsState* signals_state) { |
| 301 lock().AssertAcquired(); | 254 base::AutoLock lock(lock_); |
| 302 if (channel_) | 255 if (!shared_ring_buffer_ || in_transit_) { |
| 303 channel_->EnsureLazyInitialized(); | 256 if (signals_state) |
| 304 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 257 *signals_state = HandleSignalsState(); |
| 258 return MOJO_RESULT_INVALID_ARGUMENT; |
| 259 } |
| 260 UpdateSignalsStateNoLock(); |
| 261 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 305 if (state.satisfies(signals)) { | 262 if (state.satisfies(signals)) { |
| 306 if (signals_state) | 263 if (signals_state) |
| 307 *signals_state = state; | 264 *signals_state = state; |
| 308 return MOJO_RESULT_ALREADY_EXISTS; | 265 return MOJO_RESULT_ALREADY_EXISTS; |
| 309 } | 266 } |
| 310 if (!state.can_satisfy(signals)) { | 267 if (!state.can_satisfy(signals)) { |
| 311 if (signals_state) | 268 if (signals_state) |
| 312 *signals_state = state; | 269 *signals_state = state; |
| 313 return MOJO_RESULT_FAILED_PRECONDITION; | 270 return MOJO_RESULT_FAILED_PRECONDITION; |
| 314 } | 271 } |
| 315 | 272 |
| 316 awakable_list_.Add(awakable, signals, context); | 273 awakable_list_.Add(awakable, signals, context); |
| 317 return MOJO_RESULT_OK; | 274 return MOJO_RESULT_OK; |
| 318 } | 275 } |
| 319 | 276 |
| 320 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( | 277 void DataPipeConsumerDispatcher::RemoveAwakable( |
| 321 Awakable* awakable, | 278 Awakable* awakable, |
| 322 HandleSignalsState* signals_state) { | 279 HandleSignalsState* signals_state) { |
| 323 lock().AssertAcquired(); | 280 base::AutoLock lock(lock_); |
| 281 if ((!shared_ring_buffer_ || in_transit_) && signals_state) |
| 282 *signals_state = HandleSignalsState(); |
| 283 else if (signals_state) |
| 284 *signals_state = GetHandleSignalsStateNoLock(); |
| 324 awakable_list_.Remove(awakable); | 285 awakable_list_.Remove(awakable); |
| 325 if (signals_state) | 286 } |
| 326 *signals_state = GetHandleSignalsStateImplNoLock(); | 287 |
| 327 } | 288 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, |
| 328 | 289 uint32_t* num_ports, |
| 329 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( | 290 uint32_t* num_handles) { |
| 330 size_t* max_size, | 291 base::AutoLock lock(lock_); |
| 331 size_t* max_platform_handles) { | 292 DCHECK(in_transit_); |
| 332 if (!serialized_) { | 293 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); |
| 333 // Handles the case where we have messages read off RawChannel but not ready | 294 *num_ports = 1; |
| 334 // by MojoReadMessage. | 295 *num_handles = 1; |
| 335 SerializeInternal(); | 296 } |
| 336 } | 297 |
| 337 | 298 bool DataPipeConsumerDispatcher::EndSerialize( |
| 338 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
| 339 !data_.empty() || !serialized_read_buffer_.empty(), | |
| 340 max_size, max_platform_handles); | |
| 341 } | |
| 342 | |
| 343 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( | |
| 344 void* destination, | 299 void* destination, |
| 345 size_t* actual_size, | 300 ports::PortName* ports, |
| 346 PlatformHandleVector* platform_handles) { | 301 PlatformHandle* platform_handles) { |
| 347 ScopedPlatformHandle shared_memory_handle; | 302 SerializedState* state = static_cast<SerializedState*>(destination); |
| 348 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); | 303 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); |
| 349 if (shared_memory_size) { | 304 |
| 350 shared_memory_size += sizeof(SharedMemoryHeader); | 305 base::AutoLock lock(lock_); |
| 351 SharedMemoryHeader header; | 306 DCHECK(in_transit_); |
| 352 header.data_size = static_cast<uint32_t>(data_.size()); | 307 state->pipe_id = pipe_id_; |
| 353 header.read_buffer_size = | 308 state->peer_closed = peer_closed_; |
| 354 static_cast<uint32_t>(serialized_read_buffer_.size()); | 309 state->read_offset = read_offset_; |
| 355 | 310 state->bytes_available = bytes_available_; |
| 356 scoped_refptr<PlatformSharedBuffer> shared_buffer( | 311 |
| 357 internal::g_platform_support->CreateSharedBuffer( | 312 ports[0] = control_port_.name(); |
| 358 shared_memory_size)); | 313 |
| 359 scoped_ptr<PlatformSharedBufferMapping> mapping( | 314 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); |
| 360 shared_buffer->Map(0, shared_memory_size)); | 315 platform_handles[0] = buffer_handle_for_transit_.get(); |
| 361 | 316 |
| 362 char* start = static_cast<char*>(mapping->GetBase()); | 317 return true; |
| 363 memcpy(start, &header, sizeof(SharedMemoryHeader)); | 318 } |
| 364 start += sizeof(SharedMemoryHeader); | 319 |
| 365 | 320 bool DataPipeConsumerDispatcher::BeginTransit() { |
| 366 if (!data_.empty()) { | 321 base::AutoLock lock(lock_); |
| 367 memcpy(start, &data_[0], data_.size()); | 322 if (in_transit_) |
| 368 start += data_.size(); | 323 return false; |
| 324 in_transit_ = !in_two_phase_read_; |
| 325 return in_transit_; |
| 326 } |
| 327 |
| 328 void DataPipeConsumerDispatcher::CompleteTransitAndClose() { |
| 329 node_controller_->SetPortObserver(control_port_, nullptr); |
| 330 |
| 331 base::AutoLock lock(lock_); |
| 332 DCHECK(in_transit_); |
| 333 in_transit_ = false; |
| 334 transferred_ = true; |
| 335 ignore_result(buffer_handle_for_transit_.release()); |
| 336 CloseNoLock(); |
| 337 } |
| 338 |
| 339 void DataPipeConsumerDispatcher::CancelTransit() { |
| 340 base::AutoLock lock(lock_); |
| 341 DCHECK(in_transit_); |
| 342 in_transit_ = false; |
| 343 buffer_handle_for_transit_.reset(); |
| 344 UpdateSignalsStateNoLock(); |
| 345 } |
| 346 |
| 347 // static |
| 348 scoped_refptr<DataPipeConsumerDispatcher> |
| 349 DataPipeConsumerDispatcher::Deserialize(const void* data, |
| 350 size_t num_bytes, |
| 351 const ports::PortName* ports, |
| 352 size_t num_ports, |
| 353 PlatformHandle* handles, |
| 354 size_t num_handles) { |
| 355 if (num_ports != 1 || num_handles != 1 || |
| 356 num_bytes != sizeof(SerializedState)) { |
| 357 return nullptr; |
| 358 } |
| 359 |
| 360 const SerializedState* state = static_cast<const SerializedState*>(data); |
| 361 |
| 362 NodeController* node_controller = internal::g_core->GetNodeController(); |
| 363 ports::PortRef port; |
| 364 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) |
| 365 return nullptr; |
| 366 |
| 367 PlatformHandle buffer_handle; |
| 368 std::swap(buffer_handle, handles[0]); |
| 369 scoped_refptr<PlatformSharedBuffer> ring_buffer = |
| 370 internal::g_platform_support->CreateSharedBufferFromHandle( |
| 371 state->options.capacity_num_bytes, |
| 372 ScopedPlatformHandle(buffer_handle)); |
| 373 if (!ring_buffer) { |
| 374 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; |
| 375 return nullptr; |
| 376 } |
| 377 |
| 378 scoped_refptr<DataPipeConsumerDispatcher> dispatcher = |
| 379 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, |
| 380 state->options, false /* initialized */, |
| 381 state->pipe_id); |
| 382 |
| 383 { |
| 384 base::AutoLock lock(dispatcher->lock_); |
| 385 dispatcher->peer_closed_ = state->peer_closed; |
| 386 dispatcher->read_offset_ = state->read_offset; |
| 387 dispatcher->bytes_available_ = state->bytes_available; |
| 388 dispatcher->InitializeNoLock(); |
| 389 } |
| 390 |
| 391 return dispatcher; |
| 392 } |
| 393 |
| 394 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
| 395 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && |
| 396 !in_transit_); |
| 397 } |
| 398 |
| 399 void DataPipeConsumerDispatcher::InitializeNoLock() { |
| 400 lock_.AssertAcquired(); |
| 401 |
| 402 if (shared_ring_buffer_) { |
| 403 DCHECK(!ring_buffer_mapping_); |
| 404 ring_buffer_mapping_ = |
| 405 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); |
| 406 if (!ring_buffer_mapping_) { |
| 407 DLOG(ERROR) << "Failed to map shared buffer."; |
| 408 shared_ring_buffer_ = nullptr; |
| 369 } | 409 } |
| 370 | 410 } |
| 371 if (!serialized_read_buffer_.empty()) { | 411 |
| 372 memcpy(start, &serialized_read_buffer_[0], | 412 base::AutoUnlock unlock(lock_); |
| 373 serialized_read_buffer_.size()); | 413 node_controller_->SetPortObserver( |
| 374 start += serialized_read_buffer_.size(); | 414 control_port_, |
| 375 } | 415 make_scoped_refptr(new PortObserverThunk(this))); |
| 376 | 416 } |
| 377 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | 417 |
| 378 } | 418 MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
| 379 | 419 lock_.AssertAcquired(); |
| 380 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | 420 if (is_closed_ || in_transit_) |
| 381 std::move(shared_memory_handle), shared_memory_size, | 421 return MOJO_RESULT_INVALID_ARGUMENT; |
| 382 destination, actual_size, platform_handles); | 422 is_closed_ = true; |
| 383 CloseImplNoLock(); | 423 ring_buffer_mapping_.reset(); |
| 384 return true; | 424 shared_ring_buffer_ = nullptr; |
| 385 } | 425 |
| 386 | 426 awakable_list_.CancelAll(); |
| 387 void DataPipeConsumerDispatcher::TransportStarted() { | 427 if (!transferred_) { |
| 388 started_transport_.Acquire(); | 428 base::AutoUnlock unlock(lock_); |
| 389 } | 429 node_controller_->ClosePort(control_port_); |
| 390 | 430 } |
| 391 void DataPipeConsumerDispatcher::TransportEnded() { | 431 |
| 392 started_transport_.Release(); | 432 return MOJO_RESULT_OK; |
| 393 | 433 } |
| 394 base::AutoLock locker(lock()); | 434 |
| 395 | 435 HandleSignalsState |
| 396 // If transporting of DP failed, we might have got more data and didn't awake | 436 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
| 397 // for. | 437 lock_.AssertAcquired(); |
| 398 // TODO(jam): should we care about only alerting if it was empty before | 438 |
| 399 // TransportStarted? | 439 HandleSignalsState rv; |
| 400 if (!data_.empty()) | 440 if (shared_ring_buffer_ && bytes_available_) { |
| 401 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 441 if (!in_two_phase_read_) |
| 402 } | 442 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 403 | 443 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 404 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { | 444 } else if (!peer_closed_ && shared_ring_buffer_) { |
| 405 lock().AssertAcquired(); | 445 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 406 return in_two_phase_read_; | 446 } |
| 407 } | 447 |
| 408 | 448 if (peer_closed_) |
| 409 void DataPipeConsumerDispatcher::OnReadMessage( | 449 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 410 const MessageInTransit::View& message_view, | 450 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 411 ScopedPlatformHandleVectorPtr platform_handles) { | 451 return rv; |
| 412 const char* bytes_start = static_cast<const char*>(message_view.bytes()); | 452 } |
| 413 const char* bytes_end = bytes_start + message_view.num_bytes(); | 453 |
| 414 if (started_transport_.Try()) { | 454 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { |
| 415 // We're not in the middle of being sent. | 455 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " |
| 416 | 456 << num_bytes << " bytes read. [control_port=" |
| 417 // Can get synchronously called back in Init if there was initial data. | 457 << control_port_.name() << "]"; |
| 418 scoped_ptr<base::AutoLock> locker; | 458 |
| 419 if (!calling_init_) { | 459 SendDataPipeControlMessage(node_controller_, control_port_, |
| 420 locker.reset(new base::AutoLock(lock())); | 460 DataPipeCommand::DATA_WAS_READ, num_bytes); |
| 421 } | 461 } |
| 422 | 462 |
| 423 if (in_two_phase_read_) { | 463 void DataPipeConsumerDispatcher::OnPortStatusChanged() { |
| 424 data_received_during_two_phase_read_.insert( | 464 base::AutoLock lock(lock_); |
| 425 data_received_during_two_phase_read_.end(), bytes_start, bytes_end); | 465 |
| 426 } else { | 466 // We stop observing the control port as soon it's transferred, but this can |
| 427 bool was_empty = data_.empty(); | 467 // race with events which are raised right before that happens. This is fine |
| 428 data_.insert(data_.end(), bytes_start, bytes_end); | 468 // to ignore. |
| 429 if (was_empty) | 469 if (transferred_) |
| 430 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 470 return; |
| 431 } | 471 |
| 432 started_transport_.Release(); | 472 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; |
| 433 } else { | 473 |
| 434 // See comment in MessagePipeDispatcher about why we can't and don't need | 474 UpdateSignalsStateNoLock(); |
| 435 // to lock here. | 475 } |
| 436 data_.insert(data_.end(), bytes_start, bytes_end); | 476 |
| 437 } | 477 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
| 438 } | 478 lock_.AssertAcquired(); |
| 439 | 479 |
| 440 void DataPipeConsumerDispatcher::OnError(Error error) { | 480 bool was_peer_closed = peer_closed_; |
| 441 switch (error) { | 481 size_t previous_bytes_available = bytes_available_; |
| 442 case ERROR_READ_SHUTDOWN: | 482 |
| 443 // The other side was cleanly closed, so this isn't actually an error. | 483 ports::PortStatus port_status; |
| 444 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; | 484 if (node_controller_->node()->GetStatus(control_port_, &port_status) != |
| 445 break; | 485 ports::OK || |
| 446 case ERROR_READ_BROKEN: | 486 !port_status.receiving_messages) { |
| 447 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; | 487 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" |
| 448 break; | 488 << " [control_port=" << control_port_.name() << "]"; |
| 449 case ERROR_READ_BAD_MESSAGE: | 489 |
| 450 // Receiving a bad message means either a bug, data corruption, or | 490 peer_closed_ = true; |
| 451 // malicious attack (probably due to some other bug). | 491 } |
| 452 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " | 492 |
| 453 << "message)"; | 493 if (port_status.has_messages && !in_transit_) { |
| 454 break; | 494 ports::ScopedMessage message; |
| 455 case ERROR_READ_UNKNOWN: | 495 do { |
| 456 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; | 496 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, |
| 457 break; | 497 &message); |
| 458 case ERROR_WRITE: | 498 if (rv != ports::OK) |
| 459 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; | 499 peer_closed_ = true; |
| 460 break; | 500 if (message) { |
| 461 } | 501 const DataPipeControlMessage* m = |
| 462 | 502 static_cast<const DataPipeControlMessage*>( |
| 463 error_ = true; | 503 message->payload_bytes()); |
| 464 if (started_transport_.Try()) { | 504 |
| 465 base::AutoLock locker(lock()); | 505 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { |
| 466 // We can get two OnError callbacks before the post task below completes. | 506 DLOG(ERROR) << "Unexpected control message from producer."; |
| 467 // Although RawChannel still has a pointer to this object until Shutdown is | 507 peer_closed_ = true; |
| 468 // called, that is safe since this class always does a PostTask to the IO | 508 break; |
| 469 // thread to self destruct. | 509 } |
| 470 if (channel_) { | 510 |
| 471 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 511 if (static_cast<size_t>(bytes_available_) + m->num_bytes > |
| 472 channel_->Shutdown(); | 512 options_.capacity_num_bytes) { |
| 473 channel_ = nullptr; | 513 DLOG(ERROR) << "Producer claims to have written too many bytes."; |
| 474 } | 514 peer_closed_ = true; |
| 475 started_transport_.Release(); | 515 break; |
| 476 } else { | 516 } |
| 477 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 517 |
| 478 } | 518 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " |
| 479 } | 519 << m->num_bytes << " bytes were written. [control_port=" |
| 480 | 520 << control_port_.name() << "]"; |
| 481 void DataPipeConsumerDispatcher::SerializeInternal() { | 521 |
| 482 DCHECK(!in_two_phase_read_); | 522 bytes_available_ += m->num_bytes; |
| 483 // We need to stop watching handle immediately, even though not on IO thread, | 523 } |
| 484 // so that other messages aren't read after this. | 524 } while (message); |
| 485 if (channel_) { | 525 } |
| 486 std::vector<char> serialized_write_buffer; | 526 |
| 487 std::vector<int> fds; | 527 if (peer_closed_ != was_peer_closed || |
| 488 bool write_error = false; | 528 bytes_available_ != previous_bytes_available) { |
| 489 serialized_platform_handle_ = channel_->ReleaseHandle( | 529 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 490 &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds, | 530 } |
| 491 &write_error); | |
| 492 CHECK(serialized_write_buffer.empty()); | |
| 493 CHECK(fds.empty()); | |
| 494 CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write."; | |
| 495 | |
| 496 channel_ = nullptr; | |
| 497 } | |
| 498 | |
| 499 serialized_ = true; | |
| 500 } | 531 } |
| 501 | 532 |
| 502 } // namespace edk | 533 } // namespace edk |
| 503 } // namespace mojo | 534 } // namespace mojo |
| OLD | NEW |