| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | 12 #include "base/bind.h" |
| 13 #include "base/logging.h" | 13 #include "base/logging.h" |
| 14 #include "base/memory/ref_counted.h" |
| 14 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
| 15 #include "mojo/edk/embedder/embedder_internal.h" | 16 #include "mojo/edk/embedder/embedder_internal.h" |
| 16 #include "mojo/edk/embedder/platform_shared_buffer.h" | 17 #include "mojo/edk/embedder/platform_shared_buffer.h" |
| 17 #include "mojo/edk/embedder/platform_support.h" | 18 #include "mojo/edk/embedder/platform_support.h" |
| 18 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
| 19 #include "mojo/edk/system/data_pipe.h" | 20 #include "mojo/edk/system/core.h" |
| 21 #include "mojo/edk/system/data_pipe_control_message.h" |
| 22 #include "mojo/edk/system/node_controller.h" |
| 23 #include "mojo/edk/system/ports_message.h" |
| 20 | 24 |
| 21 namespace mojo { | 25 namespace mojo { |
| 22 namespace edk { | 26 namespace edk { |
| 23 | 27 |
| 24 void DataPipeProducerDispatcher::Init( | 28 namespace { |
| 25 ScopedPlatformHandle message_pipe, | |
| 26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { | |
| 27 if (message_pipe.is_valid()) { | |
| 28 channel_ = RawChannel::Create(std::move(message_pipe)); | |
| 29 channel_->SetSerializedData( | |
| 30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, | |
| 31 nullptr, nullptr); | |
| 32 internal::g_io_thread_task_runner->PostTask( | |
| 33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | |
| 34 } else { | |
| 35 error_ = true; | |
| 36 } | |
| 37 } | |
| 38 | 29 |
| 39 void DataPipeProducerDispatcher::InitOnIO() { | 30 struct SerializedState { |
| 40 base::AutoLock locker(lock()); | 31 MojoCreateDataPipeOptions options; |
| 41 if (channel_) | 32 uint64_t pipe_id; |
| 42 channel_->Init(this); | 33 bool peer_closed; |
| 43 } | 34 uint32_t write_offset; |
| 35 uint32_t available_capacity; |
| 36 }; |
| 44 | 37 |
| 45 void DataPipeProducerDispatcher::CloseOnIO() { | 38 } // namespace |
| 46 base::AutoLock locker(lock()); | 39 |
| 47 if (channel_) { | 40 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a |
| 48 channel_->Shutdown(); | 41 // reference to the dispatcher to ensure it lives as long as the observed port. |
| 49 channel_ = nullptr; | 42 class DataPipeProducerDispatcher::PortObserverThunk |
| 43 : public NodeController::PortObserver { |
| 44 public: |
| 45 explicit PortObserverThunk( |
| 46 scoped_refptr<DataPipeProducerDispatcher> dispatcher) |
| 47 : dispatcher_(dispatcher) {} |
| 48 |
| 49 private: |
| 50 ~PortObserverThunk() override {} |
| 51 |
| 52 // NodeController::PortObserver: |
| 53 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
| 54 |
| 55 scoped_refptr<DataPipeProducerDispatcher> dispatcher_; |
| 56 |
| 57 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
| 58 }; |
| 59 |
| 60 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| 61 NodeController* node_controller, |
| 62 const ports::PortRef& control_port, |
| 63 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
| 64 const MojoCreateDataPipeOptions& options, |
| 65 bool initialized, |
| 66 uint64_t pipe_id) |
| 67 : options_(options), |
| 68 node_controller_(node_controller), |
| 69 control_port_(control_port), |
| 70 pipe_id_(pipe_id), |
| 71 shared_ring_buffer_(shared_ring_buffer), |
| 72 available_capacity_(options_.capacity_num_bytes) { |
| 73 if (initialized) { |
| 74 base::AutoLock lock(lock_); |
| 75 InitializeNoLock(); |
| 50 } | 76 } |
| 51 } | 77 } |
| 52 | 78 |
| 53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 79 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
| 54 return Type::DATA_PIPE_PRODUCER; | 80 return Type::DATA_PIPE_PRODUCER; |
| 55 } | 81 } |
| 56 | 82 |
| 57 scoped_refptr<DataPipeProducerDispatcher> | 83 MojoResult DataPipeProducerDispatcher::Close() { |
| 58 DataPipeProducerDispatcher::Deserialize( | 84 base::AutoLock lock(lock_); |
| 59 const void* source, | 85 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
| 60 size_t size, | 86 return CloseNoLock(); |
| 61 PlatformHandleVector* platform_handles) { | |
| 62 MojoCreateDataPipeOptions options; | |
| 63 ScopedPlatformHandle shared_memory_handle; | |
| 64 size_t shared_memory_size = 0; | |
| 65 ScopedPlatformHandle platform_handle = | |
| 66 DataPipe::Deserialize(source, size, platform_handles, &options, | |
| 67 &shared_memory_handle, &shared_memory_size); | |
| 68 | |
| 69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | |
| 70 | |
| 71 char* serialized_write_buffer = nullptr; | |
| 72 size_t serialized_write_buffer_size = 0; | |
| 73 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
| 74 scoped_ptr<PlatformSharedBufferMapping> mapping; | |
| 75 if (shared_memory_size) { | |
| 76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | |
| 77 shared_memory_size, std::move(shared_memory_handle)); | |
| 78 mapping = shared_buffer->Map(0, shared_memory_size); | |
| 79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); | |
| 80 serialized_write_buffer_size = shared_memory_size; | |
| 81 } | |
| 82 | |
| 83 rv->Init(std::move(platform_handle), serialized_write_buffer, | |
| 84 serialized_write_buffer_size); | |
| 85 return rv; | |
| 86 } | 87 } |
| 87 | 88 |
| 88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | 89 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
| 89 const MojoCreateDataPipeOptions& options) | 90 uint32_t* num_bytes, |
| 90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { | 91 MojoWriteDataFlags flags) { |
| 91 } | 92 base::AutoLock lock(lock_); |
| 93 if (!shared_ring_buffer_ || in_transit_) |
| 94 return MOJO_RESULT_INVALID_ARGUMENT; |
| 92 | 95 |
| 93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | 96 if (in_two_phase_write_) |
| 94 // See comment in ~MessagePipeDispatcher. | 97 return MOJO_RESULT_BUSY; |
| 95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | |
| 96 channel_->Shutdown(); | |
| 97 else | |
| 98 DCHECK(!channel_); | |
| 99 } | |
| 100 | 98 |
| 101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | 99 if (peer_closed_) |
| 102 lock().AssertAcquired(); | 100 return MOJO_RESULT_FAILED_PRECONDITION; |
| 103 awakable_list_.CancelAll(); | |
| 104 } | |
| 105 | 101 |
| 106 void DataPipeProducerDispatcher::CloseImplNoLock() { | |
| 107 lock().AssertAcquired(); | |
| 108 internal::g_io_thread_task_runner->PostTask( | |
| 109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | |
| 110 } | |
| 111 | |
| 112 scoped_refptr<Dispatcher> | |
| 113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
| 114 lock().AssertAcquired(); | |
| 115 | |
| 116 SerializeInternal(); | |
| 117 | |
| 118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | |
| 119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
| 120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
| 121 rv->serialized_ = true; | |
| 122 return scoped_refptr<Dispatcher>(rv.get()); | |
| 123 } | |
| 124 | |
| 125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | |
| 126 const void* elements, | |
| 127 uint32_t* num_bytes, | |
| 128 MojoWriteDataFlags flags) { | |
| 129 lock().AssertAcquired(); | |
| 130 if (InTwoPhaseWrite()) | |
| 131 return MOJO_RESULT_BUSY; | |
| 132 if (error_) | |
| 133 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 134 if (*num_bytes % options_.element_num_bytes != 0) | 102 if (*num_bytes % options_.element_num_bytes != 0) |
| 135 return MOJO_RESULT_INVALID_ARGUMENT; | 103 return MOJO_RESULT_INVALID_ARGUMENT; |
| 136 if (*num_bytes == 0) | 104 if (*num_bytes == 0) |
| 137 return MOJO_RESULT_OK; // Nothing to do. | 105 return MOJO_RESULT_OK; // Nothing to do. |
| 138 | 106 |
| 139 // For now, we ignore options.capacity_num_bytes as a total of all pending | |
| 140 // writes (and just treat it per message). We will implement that later if | |
| 141 // we need to. All current uses want all their data to be sent, and it's not | |
| 142 // clear that this backpressure should be done at the mojo layer or at a | |
| 143 // higher application layer. | |
| 144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | 107 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
| 145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; | 108 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
| 146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | 109 if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
| 147 // Don't return "should wait" since you can't wait for a specified amount of | 110 // Don't return "should wait" since you can't wait for a specified amount of |
| 148 // data. | 111 // data. |
| 149 return MOJO_RESULT_OUT_OF_RANGE; | 112 return MOJO_RESULT_OUT_OF_RANGE; |
| 150 } | 113 } |
| 151 | 114 |
| 152 uint32_t num_bytes_to_write = | 115 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
| 153 std::min(*num_bytes, options_.capacity_num_bytes); | 116 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
| 154 if (num_bytes_to_write == 0) | 117 if (num_bytes_to_write == 0) |
| 155 return MOJO_RESULT_SHOULD_WAIT; | 118 return MOJO_RESULT_SHOULD_WAIT; |
| 156 | 119 |
| 157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 120 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
| 158 | 121 |
| 159 *num_bytes = num_bytes_to_write; | 122 *num_bytes = num_bytes_to_write; |
| 160 WriteDataIntoMessages(elements, num_bytes_to_write); | |
| 161 | 123 |
| 162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 124 CHECK(ring_buffer_mapping_); |
| 125 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 126 CHECK(data); |
| 127 |
| 128 const uint8_t* source = static_cast<const uint8_t*>(elements); |
| 129 CHECK(source); |
| 130 |
| 131 DCHECK_LE(write_offset_, options_.capacity_num_bytes); |
| 132 uint32_t tail_bytes_to_write = |
| 133 std::min(options_.capacity_num_bytes - write_offset_, |
| 134 num_bytes_to_write); |
| 135 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; |
| 136 |
| 137 DCHECK_GT(tail_bytes_to_write, 0u); |
| 138 memcpy(data + write_offset_, source, tail_bytes_to_write); |
| 139 if (head_bytes_to_write > 0) |
| 140 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
| 141 |
| 142 DCHECK_LE(num_bytes_to_write, available_capacity_); |
| 143 available_capacity_ -= num_bytes_to_write; |
| 144 write_offset_ = (write_offset_ + num_bytes_to_write) % |
| 145 options_.capacity_num_bytes; |
| 146 |
| 147 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 163 if (!new_state.equals(old_state)) | 148 if (!new_state.equals(old_state)) |
| 164 awakable_list_.AwakeForStateChange(new_state); | 149 awakable_list_.AwakeForStateChange(new_state); |
| 165 return MOJO_RESULT_OK; | |
| 166 } | |
| 167 | 150 |
| 168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | 151 base::AutoUnlock unlock(lock_); |
| 169 void** buffer, | 152 NotifyWrite(num_bytes_to_write); |
| 170 uint32_t* buffer_num_bytes, | |
| 171 MojoWriteDataFlags flags) { | |
| 172 lock().AssertAcquired(); | |
| 173 if (InTwoPhaseWrite()) | |
| 174 return MOJO_RESULT_BUSY; | |
| 175 if (error_) | |
| 176 return MOJO_RESULT_FAILED_PRECONDITION; | |
| 177 | |
| 178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | |
| 179 if (*buffer_num_bytes == 0) | |
| 180 *buffer_num_bytes = options_.capacity_num_bytes; | |
| 181 | |
| 182 two_phase_data_.resize(*buffer_num_bytes); | |
| 183 *buffer = &two_phase_data_[0]; | |
| 184 | |
| 185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | |
| 186 // we can construct a MessageInTransit here. But then we need to make | |
| 187 // MessageInTransit support changing its data size later. | |
| 188 | 153 |
| 189 return MOJO_RESULT_OK; | 154 return MOJO_RESULT_OK; |
| 190 } | 155 } |
| 191 | 156 |
| 192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | 157 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
| 158 void** buffer, |
| 159 uint32_t* buffer_num_bytes, |
| 160 MojoWriteDataFlags flags) { |
| 161 base::AutoLock lock(lock_); |
| 162 if (!shared_ring_buffer_ || in_transit_) |
| 163 return MOJO_RESULT_INVALID_ARGUMENT; |
| 164 if (in_two_phase_write_) |
| 165 return MOJO_RESULT_BUSY; |
| 166 if (peer_closed_) |
| 167 return MOJO_RESULT_FAILED_PRECONDITION; |
| 168 |
| 169 if (available_capacity_ == 0) { |
| 170 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 171 : MOJO_RESULT_SHOULD_WAIT; |
| 172 } |
| 173 |
| 174 in_two_phase_write_ = true; |
| 175 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, |
| 176 available_capacity_); |
| 177 DCHECK_GT(*buffer_num_bytes, 0u); |
| 178 |
| 179 CHECK(ring_buffer_mapping_); |
| 180 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 181 *buffer = data + write_offset_; |
| 182 |
| 183 return MOJO_RESULT_OK; |
| 184 } |
| 185 |
| 186 MojoResult DataPipeProducerDispatcher::EndWriteData( |
| 193 uint32_t num_bytes_written) { | 187 uint32_t num_bytes_written) { |
| 194 lock().AssertAcquired(); | 188 base::AutoLock lock(lock_); |
| 195 if (!InTwoPhaseWrite()) | 189 if (is_closed_ || in_transit_) |
| 190 return MOJO_RESULT_INVALID_ARGUMENT; |
| 191 |
| 192 if (!in_two_phase_write_) |
| 196 return MOJO_RESULT_FAILED_PRECONDITION; | 193 return MOJO_RESULT_FAILED_PRECONDITION; |
| 197 | 194 |
| 195 DCHECK(shared_ring_buffer_); |
| 196 DCHECK(ring_buffer_mapping_); |
| 197 |
| 198 // Note: Allow successful completion of the two-phase write even if the other | 198 // Note: Allow successful completion of the two-phase write even if the other |
| 199 // side has been closed. | 199 // side has been closed. |
| 200 MojoResult rv = MOJO_RESULT_OK; | 200 MojoResult rv = MOJO_RESULT_OK; |
| 201 if (num_bytes_written > two_phase_data_.size() || | 201 if (num_bytes_written > available_capacity_ || |
| 202 num_bytes_written % options_.element_num_bytes != 0) { | 202 num_bytes_written % options_.element_num_bytes != 0 || |
| 203 write_offset_ + num_bytes_written > options_.capacity_num_bytes) { |
| 203 rv = MOJO_RESULT_INVALID_ARGUMENT; | 204 rv = MOJO_RESULT_INVALID_ARGUMENT; |
| 204 } else if (channel_) { | 205 } else { |
| 205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); | 206 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); |
| 207 available_capacity_ -= num_bytes_written; |
| 208 write_offset_ = (write_offset_ + num_bytes_written) % |
| 209 options_.capacity_num_bytes; |
| 210 |
| 211 base::AutoUnlock unlock(lock_); |
| 212 NotifyWrite(num_bytes_written); |
| 206 } | 213 } |
| 207 | 214 |
| 208 // Two-phase write ended even on failure. | 215 in_two_phase_write_ = false; |
| 209 two_phase_data_.clear(); | 216 |
| 210 // If we're now writable, we *became* writable (since we weren't writable | 217 // If we're now writable, we *became* writable (since we weren't writable |
| 211 // during the two-phase write), so awake producer awakables. | 218 // during the two-phase write), so awake producer awakables. |
| 212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 219 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| 213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 220 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
| 214 awakable_list_.AwakeForStateChange(new_state); | 221 awakable_list_.AwakeForStateChange(new_state); |
| 215 | 222 |
| 216 return rv; | 223 return rv; |
| 217 } | 224 } |
| 218 | 225 |
| 219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | 226 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
| 220 const { | 227 base::AutoLock lock(lock_); |
| 221 lock().AssertAcquired(); | 228 return GetHandleSignalsStateNoLock(); |
| 222 | |
| 223 HandleSignalsState rv; | |
| 224 if (!error_) { | |
| 225 if (!InTwoPhaseWrite()) | |
| 226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
| 227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
| 228 } else { | |
| 229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 230 } | |
| 231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
| 232 return rv; | |
| 233 } | 229 } |
| 234 | 230 |
| 235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | 231 MojoResult DataPipeProducerDispatcher::AddAwakable( |
| 236 Awakable* awakable, | 232 Awakable* awakable, |
| 237 MojoHandleSignals signals, | 233 MojoHandleSignals signals, |
| 238 uintptr_t context, | 234 uintptr_t context, |
| 239 HandleSignalsState* signals_state) { | 235 HandleSignalsState* signals_state) { |
| 240 lock().AssertAcquired(); | 236 base::AutoLock lock(lock_); |
| 241 if (channel_) | 237 if (!shared_ring_buffer_ || in_transit_) { |
| 242 channel_->EnsureLazyInitialized(); | 238 if (signals_state) |
| 243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 239 *signals_state = HandleSignalsState(); |
| 240 return MOJO_RESULT_INVALID_ARGUMENT; |
| 241 } |
| 242 UpdateSignalsStateNoLock(); |
| 243 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 244 if (state.satisfies(signals)) { | 244 if (state.satisfies(signals)) { |
| 245 if (signals_state) | 245 if (signals_state) |
| 246 *signals_state = state; | 246 *signals_state = state; |
| 247 return MOJO_RESULT_ALREADY_EXISTS; | 247 return MOJO_RESULT_ALREADY_EXISTS; |
| 248 } | 248 } |
| 249 if (!state.can_satisfy(signals)) { | 249 if (!state.can_satisfy(signals)) { |
| 250 if (signals_state) | 250 if (signals_state) |
| 251 *signals_state = state; | 251 *signals_state = state; |
| 252 return MOJO_RESULT_FAILED_PRECONDITION; | 252 return MOJO_RESULT_FAILED_PRECONDITION; |
| 253 } | 253 } |
| 254 | 254 |
| 255 awakable_list_.Add(awakable, signals, context); | 255 awakable_list_.Add(awakable, signals, context); |
| 256 return MOJO_RESULT_OK; | 256 return MOJO_RESULT_OK; |
| 257 } | 257 } |
| 258 | 258 |
| 259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | 259 void DataPipeProducerDispatcher::RemoveAwakable( |
| 260 Awakable* awakable, | 260 Awakable* awakable, |
| 261 HandleSignalsState* signals_state) { | 261 HandleSignalsState* signals_state) { |
| 262 lock().AssertAcquired(); | 262 base::AutoLock lock(lock_); |
| 263 if ((!shared_ring_buffer_ || in_transit_) && signals_state) |
| 264 *signals_state = HandleSignalsState(); |
| 265 else if (signals_state) |
| 266 *signals_state = GetHandleSignalsStateNoLock(); |
| 263 awakable_list_.Remove(awakable); | 267 awakable_list_.Remove(awakable); |
| 264 if (signals_state) | 268 } |
| 265 *signals_state = GetHandleSignalsStateImplNoLock(); | 269 |
| 266 } | 270 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, |
| 267 | 271 uint32_t* num_ports, |
| 268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | 272 uint32_t* num_handles) { |
| 269 size_t* max_size, | 273 base::AutoLock lock(lock_); |
| 270 size_t* max_platform_handles) { | 274 DCHECK(in_transit_); |
| 271 if (!serialized_) | 275 *num_bytes = sizeof(SerializedState); |
| 272 SerializeInternal(); | 276 *num_ports = 1; |
| 273 | 277 *num_handles = 1; |
| 274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | 278 } |
| 275 !serialized_write_buffer_.empty(), max_size, | 279 |
| 276 max_platform_handles); | 280 bool DataPipeProducerDispatcher::EndSerialize( |
| 277 } | |
| 278 | |
| 279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | |
| 280 void* destination, | 281 void* destination, |
| 281 size_t* actual_size, | 282 ports::PortName* ports, |
| 282 PlatformHandleVector* platform_handles) { | 283 PlatformHandle* platform_handles) { |
| 283 ScopedPlatformHandle shared_memory_handle; | 284 SerializedState* state = static_cast<SerializedState*>(destination); |
| 284 size_t shared_memory_size = serialized_write_buffer_.size(); | 285 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); |
| 285 if (shared_memory_size) { | 286 |
| 286 scoped_refptr<PlatformSharedBuffer> shared_buffer( | 287 base::AutoLock lock(lock_); |
| 287 internal::g_platform_support->CreateSharedBuffer( | 288 DCHECK(in_transit_); |
| 288 shared_memory_size)); | 289 state->pipe_id = pipe_id_; |
| 289 scoped_ptr<PlatformSharedBufferMapping> mapping( | 290 state->peer_closed = peer_closed_; |
| 290 shared_buffer->Map(0, shared_memory_size)); | 291 state->write_offset = write_offset_; |
| 291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0], | 292 state->available_capacity = available_capacity_; |
| 292 shared_memory_size); | 293 |
| 293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | 294 ports[0] = control_port_.name(); |
| 294 } | 295 |
| 295 | 296 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); |
| 296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | 297 platform_handles[0] = buffer_handle_for_transit_.get(); |
| 297 std::move(shared_memory_handle), shared_memory_size, | 298 |
| 298 destination, actual_size, platform_handles); | |
| 299 CloseImplNoLock(); | |
| 300 return true; | 299 return true; |
| 301 } | 300 } |
| 302 | 301 |
| 303 void DataPipeProducerDispatcher::TransportStarted() { | 302 bool DataPipeProducerDispatcher::BeginTransit() { |
| 304 started_transport_.Acquire(); | 303 base::AutoLock lock(lock_); |
| 305 } | 304 if (in_transit_) |
| 306 | 305 return false; |
| 307 void DataPipeProducerDispatcher::TransportEnded() { | 306 in_transit_ = !in_two_phase_write_; |
| 308 started_transport_.Release(); | 307 return in_transit_; |
| 309 } | 308 } |
| 310 | 309 |
| 311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | 310 void DataPipeProducerDispatcher::CompleteTransitAndClose() { |
| 312 lock().AssertAcquired(); | 311 node_controller_->SetPortObserver(control_port_, nullptr); |
| 313 return InTwoPhaseWrite(); | 312 |
| 314 } | 313 base::AutoLock lock(lock_); |
| 315 | 314 DCHECK(in_transit_); |
| 316 void DataPipeProducerDispatcher::OnReadMessage( | 315 transferred_ = true; |
| 317 const MessageInTransit::View& message_view, | 316 in_transit_ = false; |
| 318 ScopedPlatformHandleVectorPtr platform_handles) { | 317 ignore_result(buffer_handle_for_transit_.release()); |
| 319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; | 318 CloseNoLock(); |
| 320 } | 319 } |
| 321 | 320 |
| 322 void DataPipeProducerDispatcher::OnError(Error error) { | 321 void DataPipeProducerDispatcher::CancelTransit() { |
| 323 switch (error) { | 322 base::AutoLock lock(lock_); |
| 324 case ERROR_READ_BROKEN: | 323 DCHECK(in_transit_); |
| 325 case ERROR_READ_BAD_MESSAGE: | 324 in_transit_ = false; |
| 326 case ERROR_READ_UNKNOWN: | 325 buffer_handle_for_transit_.reset(); |
| 327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; | 326 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 328 break; | 327 } |
| 329 case ERROR_READ_SHUTDOWN: | 328 |
| 330 // The other side was cleanly closed, so this isn't actually an error. | 329 // static |
| 331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; | 330 scoped_refptr<DataPipeProducerDispatcher> |
| 332 break; | 331 DataPipeProducerDispatcher::Deserialize(const void* data, |
| 333 case ERROR_WRITE: | 332 size_t num_bytes, |
| 334 // Write errors are slightly notable: they probably shouldn't happen under | 333 const ports::PortName* ports, |
| 335 // normal operation (but maybe the other side crashed). | 334 size_t num_ports, |
| 336 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | 335 PlatformHandle* handles, |
| 337 break; | 336 size_t num_handles) { |
| 338 } | 337 if (num_ports != 1 || num_handles != 1 || |
| 339 | 338 num_bytes != sizeof(SerializedState)) { |
| 340 error_ = true; | 339 return nullptr; |
| 341 if (started_transport_.Try()) { | 340 } |
| 342 base::AutoLock locker(lock()); | 341 |
| 343 // We can get two OnError callbacks before the post task below completes. | 342 const SerializedState* state = static_cast<const SerializedState*>(data); |
| 344 // Although RawChannel still has a pointer to this object until Shutdown is | 343 |
| 345 // called, that is safe since this class always does a PostTask to the IO | 344 NodeController* node_controller = internal::g_core->GetNodeController(); |
| 346 // thread to self destruct. | 345 ports::PortRef port; |
| 347 if (channel_) { | 346 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) |
| 348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 347 return nullptr; |
| 349 channel_->Shutdown(); | 348 |
| 350 channel_ = nullptr; | 349 PlatformHandle buffer_handle; |
| 350 std::swap(buffer_handle, handles[0]); |
| 351 scoped_refptr<PlatformSharedBuffer> ring_buffer = |
| 352 internal::g_platform_support->CreateSharedBufferFromHandle( |
| 353 state->options.capacity_num_bytes, |
| 354 ScopedPlatformHandle(buffer_handle)); |
| 355 if (!ring_buffer) { |
| 356 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; |
| 357 return nullptr; |
| 358 } |
| 359 |
| 360 scoped_refptr<DataPipeProducerDispatcher> dispatcher = |
| 361 new DataPipeProducerDispatcher(node_controller, port, ring_buffer, |
| 362 state->options, false /* initialized */, |
| 363 state->pipe_id); |
| 364 |
| 365 { |
| 366 base::AutoLock lock(dispatcher->lock_); |
| 367 dispatcher->peer_closed_ = state->peer_closed; |
| 368 dispatcher->write_offset_ = state->write_offset; |
| 369 dispatcher->available_capacity_ = state->available_capacity; |
| 370 dispatcher->InitializeNoLock(); |
| 371 } |
| 372 |
| 373 return dispatcher; |
| 374 } |
| 375 |
| 376 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
| 377 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ && |
| 378 !ring_buffer_mapping_); |
| 379 } |
| 380 |
| 381 void DataPipeProducerDispatcher::InitializeNoLock() { |
| 382 lock_.AssertAcquired(); |
| 383 |
| 384 if (shared_ring_buffer_) { |
| 385 ring_buffer_mapping_ = |
| 386 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); |
| 387 if (!ring_buffer_mapping_) { |
| 388 DLOG(ERROR) << "Failed to map shared buffer."; |
| 389 shared_ring_buffer_ = nullptr; |
| 351 } | 390 } |
| 352 started_transport_.Release(); | 391 } |
| 392 |
| 393 base::AutoUnlock unlock(lock_); |
| 394 node_controller_->SetPortObserver( |
| 395 control_port_, |
| 396 make_scoped_refptr(new PortObserverThunk(this))); |
| 397 } |
| 398 |
| 399 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
| 400 lock_.AssertAcquired(); |
| 401 if (is_closed_ || in_transit_) |
| 402 return MOJO_RESULT_INVALID_ARGUMENT; |
| 403 is_closed_ = true; |
| 404 ring_buffer_mapping_.reset(); |
| 405 shared_ring_buffer_ = nullptr; |
| 406 |
| 407 awakable_list_.CancelAll(); |
| 408 if (!transferred_) { |
| 409 base::AutoUnlock unlock(lock_); |
| 410 node_controller_->ClosePort(control_port_); |
| 411 } |
| 412 |
| 413 return MOJO_RESULT_OK; |
| 414 } |
| 415 |
| 416 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
| 417 const { |
| 418 lock_.AssertAcquired(); |
| 419 HandleSignalsState rv; |
| 420 if (!peer_closed_) { |
| 421 if (!in_two_phase_write_ && shared_ring_buffer_ && |
| 422 available_capacity_ > 0) |
| 423 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 424 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 353 } else { | 425 } else { |
| 354 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 426 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 355 } | 427 } |
| 356 } | 428 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 357 | 429 return rv; |
| 358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | 430 } |
| 359 return !two_phase_data_.empty(); | 431 |
| 360 } | 432 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
| 361 | 433 DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: " |
| 362 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | 434 << num_bytes << " bytes written. [control_port=" |
| 363 const void* elements, | 435 << control_port_.name() << "]"; |
| 364 uint32_t num_bytes) { | 436 |
| 365 // The maximum amount of data to send per message (make it a multiple of the | 437 SendDataPipeControlMessage(node_controller_, control_port_, |
| 366 // element size. | 438 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); |
| 367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 439 } |
| 368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | 440 |
| 369 DCHECK_GT(max_message_num_bytes, 0u); | 441 void DataPipeProducerDispatcher::OnPortStatusChanged() { |
| 370 | 442 base::AutoLock lock(lock_); |
| 371 uint32_t offset = 0; | 443 |
| 372 while (offset < num_bytes) { | 444 // We stop observing the control port as soon it's transferred, but this can |
| 373 uint32_t message_num_bytes = | 445 // race with events which are raised right before that happens. This is fine |
| 374 std::min(static_cast<uint32_t>(max_message_num_bytes), | 446 // to ignore. |
| 375 num_bytes - offset); | 447 if (transferred_) |
| 376 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 448 return; |
| 377 MessageInTransit::Type::MESSAGE, message_num_bytes, | 449 |
| 378 static_cast<const char*>(elements) + offset)); | 450 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; |
| 379 if (!channel_->WriteMessage(std::move(message))) { | 451 |
| 380 error_ = true; | 452 UpdateSignalsStateNoLock(); |
| 381 return false; | 453 } |
| 382 } | 454 |
| 383 | 455 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { |
| 384 offset += message_num_bytes; | 456 lock_.AssertAcquired(); |
| 385 } | 457 |
| 386 | 458 bool was_peer_closed = peer_closed_; |
| 387 return true; | 459 size_t previous_capacity = available_capacity_; |
| 388 } | 460 |
| 389 | 461 ports::PortStatus port_status; |
| 390 void DataPipeProducerDispatcher::SerializeInternal() { | 462 if (node_controller_->node()->GetStatus(control_port_, &port_status) != |
| 391 // We need to stop watching handle immediately, even though not on IO thread, | 463 ports::OK || |
| 392 // so that other messages aren't read after this. | 464 !port_status.receiving_messages) { |
| 393 if (channel_) { | 465 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" |
| 394 std::vector<char> serialized_read_buffer; | 466 << " [control_port=" << control_port_.name() << "]"; |
| 395 std::vector<int> fds; | 467 |
| 396 bool write_error = false; | 468 peer_closed_ = true; |
| 397 serialized_platform_handle_ = channel_->ReleaseHandle( | 469 } |
| 398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, | 470 |
| 399 &write_error); | 471 if (port_status.has_messages && !in_transit_) { |
| 400 CHECK(serialized_read_buffer.empty()); | 472 ports::ScopedMessage message; |
| 401 CHECK(fds.empty()); | 473 do { |
| 402 if (write_error) | 474 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, |
| 403 serialized_platform_handle_.reset(); | 475 &message); |
| 404 channel_ = nullptr; | 476 if (rv != ports::OK) |
| 405 } | 477 peer_closed_ = true; |
| 406 serialized_ = true; | 478 if (message) { |
| 479 PortsMessage* ports_message = static_cast<PortsMessage*>(message.get()); |
| 480 const DataPipeControlMessage* m = |
| 481 static_cast<const DataPipeControlMessage*>( |
| 482 ports_message->payload_bytes()); |
| 483 |
| 484 if (m->command != DataPipeCommand::DATA_WAS_READ) { |
| 485 DLOG(ERROR) << "Unexpected message from consumer."; |
| 486 peer_closed_ = true; |
| 487 break; |
| 488 } |
| 489 |
| 490 if (static_cast<size_t>(available_capacity_) + m->num_bytes > |
| 491 options_.capacity_num_bytes) { |
| 492 DLOG(ERROR) << "Consumer claims to have read too many bytes."; |
| 493 break; |
| 494 } |
| 495 |
| 496 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " |
| 497 << m->num_bytes << " bytes were read. [control_port=" |
| 498 << control_port_.name() << "]"; |
| 499 |
| 500 available_capacity_ += m->num_bytes; |
| 501 } |
| 502 } while (message); |
| 503 } |
| 504 |
| 505 if (peer_closed_ != was_peer_closed || |
| 506 available_capacity_ != previous_capacity) { |
| 507 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 508 } |
| 407 } | 509 } |
| 408 | 510 |
| 409 } // namespace edk | 511 } // namespace edk |
| 410 } // namespace mojo | 512 } // namespace mojo |
| OLD | NEW |