OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/logging.h" | 13 #include "base/logging.h" |
| 14 #include "base/memory/ref_counted.h" |
14 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
15 #include "mojo/edk/embedder/embedder_internal.h" | 16 #include "mojo/edk/embedder/embedder_internal.h" |
16 #include "mojo/edk/embedder/platform_shared_buffer.h" | 17 #include "mojo/edk/embedder/platform_shared_buffer.h" |
17 #include "mojo/edk/embedder/platform_support.h" | 18 #include "mojo/edk/embedder/platform_support.h" |
18 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
19 #include "mojo/edk/system/data_pipe.h" | 20 #include "mojo/edk/system/core.h" |
| 21 #include "mojo/edk/system/data_pipe_control_message.h" |
| 22 #include "mojo/edk/system/node_controller.h" |
| 23 #include "mojo/edk/system/ports_message.h" |
20 | 24 |
21 namespace mojo { | 25 namespace mojo { |
22 namespace edk { | 26 namespace edk { |
23 | 27 |
24 void DataPipeProducerDispatcher::Init( | 28 namespace { |
25 ScopedPlatformHandle message_pipe, | |
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { | |
27 if (message_pipe.is_valid()) { | |
28 channel_ = RawChannel::Create(std::move(message_pipe)); | |
29 channel_->SetSerializedData( | |
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, | |
31 nullptr, nullptr); | |
32 internal::g_io_thread_task_runner->PostTask( | |
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | |
34 } else { | |
35 error_ = true; | |
36 } | |
37 } | |
38 | 29 |
39 void DataPipeProducerDispatcher::InitOnIO() { | 30 struct SerializedState { |
40 base::AutoLock locker(lock()); | 31 MojoCreateDataPipeOptions options; |
41 if (channel_) | 32 uint64_t pipe_id; |
42 channel_->Init(this); | 33 bool peer_closed; |
43 } | 34 uint32_t write_offset; |
| 35 uint32_t available_capacity; |
| 36 }; |
44 | 37 |
45 void DataPipeProducerDispatcher::CloseOnIO() { | 38 } // namespace |
46 base::AutoLock locker(lock()); | 39 |
47 if (channel_) { | 40 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a |
48 channel_->Shutdown(); | 41 // reference to the dispatcher to ensure it lives as long as the observed port. |
49 channel_ = nullptr; | 42 class DataPipeProducerDispatcher::PortObserverThunk |
| 43 : public NodeController::PortObserver { |
| 44 public: |
| 45 explicit PortObserverThunk( |
| 46 scoped_refptr<DataPipeProducerDispatcher> dispatcher) |
| 47 : dispatcher_(dispatcher) {} |
| 48 |
| 49 private: |
| 50 ~PortObserverThunk() override {} |
| 51 |
| 52 // NodeController::PortObserver: |
| 53 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
| 54 |
| 55 scoped_refptr<DataPipeProducerDispatcher> dispatcher_; |
| 56 |
| 57 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
| 58 }; |
| 59 |
| 60 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| 61 NodeController* node_controller, |
| 62 const ports::PortRef& control_port, |
| 63 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, |
| 64 const MojoCreateDataPipeOptions& options, |
| 65 bool initialized, |
| 66 uint64_t pipe_id) |
| 67 : options_(options), |
| 68 node_controller_(node_controller), |
| 69 control_port_(control_port), |
| 70 pipe_id_(pipe_id), |
| 71 shared_ring_buffer_(shared_ring_buffer), |
| 72 available_capacity_(options_.capacity_num_bytes) { |
| 73 if (initialized) { |
| 74 base::AutoLock lock(lock_); |
| 75 InitializeNoLock(); |
50 } | 76 } |
51 } | 77 } |
52 | 78 |
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 79 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
54 return Type::DATA_PIPE_PRODUCER; | 80 return Type::DATA_PIPE_PRODUCER; |
55 } | 81 } |
56 | 82 |
57 scoped_refptr<DataPipeProducerDispatcher> | 83 MojoResult DataPipeProducerDispatcher::Close() { |
58 DataPipeProducerDispatcher::Deserialize( | 84 base::AutoLock lock(lock_); |
59 const void* source, | 85 DVLOG(1) << "Closing data pipe producer " << pipe_id_; |
60 size_t size, | 86 return CloseNoLock(); |
61 PlatformHandleVector* platform_handles) { | |
62 MojoCreateDataPipeOptions options; | |
63 ScopedPlatformHandle shared_memory_handle; | |
64 size_t shared_memory_size = 0; | |
65 ScopedPlatformHandle platform_handle = | |
66 DataPipe::Deserialize(source, size, platform_handles, &options, | |
67 &shared_memory_handle, &shared_memory_size); | |
68 | |
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | |
70 | |
71 char* serialized_write_buffer = nullptr; | |
72 size_t serialized_write_buffer_size = 0; | |
73 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
74 scoped_ptr<PlatformSharedBufferMapping> mapping; | |
75 if (shared_memory_size) { | |
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | |
77 shared_memory_size, std::move(shared_memory_handle)); | |
78 mapping = shared_buffer->Map(0, shared_memory_size); | |
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); | |
80 serialized_write_buffer_size = shared_memory_size; | |
81 } | |
82 | |
83 rv->Init(std::move(platform_handle), serialized_write_buffer, | |
84 serialized_write_buffer_size); | |
85 return rv; | |
86 } | 87 } |
87 | 88 |
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | 89 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
89 const MojoCreateDataPipeOptions& options) | 90 uint32_t* num_bytes, |
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { | 91 MojoWriteDataFlags flags) { |
91 } | 92 base::AutoLock lock(lock_); |
| 93 if (!shared_ring_buffer_ || in_transit_) |
| 94 return MOJO_RESULT_INVALID_ARGUMENT; |
92 | 95 |
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | 96 if (in_two_phase_write_) |
94 // See comment in ~MessagePipeDispatcher. | 97 return MOJO_RESULT_BUSY; |
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | |
96 channel_->Shutdown(); | |
97 else | |
98 DCHECK(!channel_); | |
99 } | |
100 | 98 |
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | 99 if (peer_closed_) |
102 lock().AssertAcquired(); | 100 return MOJO_RESULT_FAILED_PRECONDITION; |
103 awakable_list_.CancelAll(); | |
104 } | |
105 | 101 |
106 void DataPipeProducerDispatcher::CloseImplNoLock() { | |
107 lock().AssertAcquired(); | |
108 internal::g_io_thread_task_runner->PostTask( | |
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | |
110 } | |
111 | |
112 scoped_refptr<Dispatcher> | |
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
114 lock().AssertAcquired(); | |
115 | |
116 SerializeInternal(); | |
117 | |
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | |
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | |
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
121 rv->serialized_ = true; | |
122 return scoped_refptr<Dispatcher>(rv.get()); | |
123 } | |
124 | |
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | |
126 const void* elements, | |
127 uint32_t* num_bytes, | |
128 MojoWriteDataFlags flags) { | |
129 lock().AssertAcquired(); | |
130 if (InTwoPhaseWrite()) | |
131 return MOJO_RESULT_BUSY; | |
132 if (error_) | |
133 return MOJO_RESULT_FAILED_PRECONDITION; | |
134 if (*num_bytes % options_.element_num_bytes != 0) | 102 if (*num_bytes % options_.element_num_bytes != 0) |
135 return MOJO_RESULT_INVALID_ARGUMENT; | 103 return MOJO_RESULT_INVALID_ARGUMENT; |
136 if (*num_bytes == 0) | 104 if (*num_bytes == 0) |
137 return MOJO_RESULT_OK; // Nothing to do. | 105 return MOJO_RESULT_OK; // Nothing to do. |
138 | 106 |
139 // For now, we ignore options.capacity_num_bytes as a total of all pending | |
140 // writes (and just treat it per message). We will implement that later if | |
141 // we need to. All current uses want all their data to be sent, and it's not | |
142 // clear that this backpressure should be done at the mojo layer or at a | |
143 // higher application layer. | |
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | 107 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; | 108 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | 109 if (min_num_bytes_to_write > options_.capacity_num_bytes) { |
147 // Don't return "should wait" since you can't wait for a specified amount of | 110 // Don't return "should wait" since you can't wait for a specified amount of |
148 // data. | 111 // data. |
149 return MOJO_RESULT_OUT_OF_RANGE; | 112 return MOJO_RESULT_OUT_OF_RANGE; |
150 } | 113 } |
151 | 114 |
152 uint32_t num_bytes_to_write = | 115 DCHECK_LE(available_capacity_, options_.capacity_num_bytes); |
153 std::min(*num_bytes, options_.capacity_num_bytes); | 116 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_); |
154 if (num_bytes_to_write == 0) | 117 if (num_bytes_to_write == 0) |
155 return MOJO_RESULT_SHOULD_WAIT; | 118 return MOJO_RESULT_SHOULD_WAIT; |
156 | 119 |
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 120 HandleSignalsState old_state = GetHandleSignalsStateNoLock(); |
158 | 121 |
159 *num_bytes = num_bytes_to_write; | 122 *num_bytes = num_bytes_to_write; |
160 WriteDataIntoMessages(elements, num_bytes_to_write); | |
161 | 123 |
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 124 CHECK(ring_buffer_mapping_); |
| 125 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 126 CHECK(data); |
| 127 |
| 128 const uint8_t* source = static_cast<const uint8_t*>(elements); |
| 129 CHECK(source); |
| 130 |
| 131 DCHECK_LE(write_offset_, options_.capacity_num_bytes); |
| 132 uint32_t tail_bytes_to_write = |
| 133 std::min(options_.capacity_num_bytes - write_offset_, |
| 134 num_bytes_to_write); |
| 135 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write; |
| 136 |
| 137 DCHECK_GT(tail_bytes_to_write, 0u); |
| 138 memcpy(data + write_offset_, source, tail_bytes_to_write); |
| 139 if (head_bytes_to_write > 0) |
| 140 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write); |
| 141 |
| 142 DCHECK_LE(num_bytes_to_write, available_capacity_); |
| 143 available_capacity_ -= num_bytes_to_write; |
| 144 write_offset_ = (write_offset_ + num_bytes_to_write) % |
| 145 options_.capacity_num_bytes; |
| 146 |
| 147 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
163 if (!new_state.equals(old_state)) | 148 if (!new_state.equals(old_state)) |
164 awakable_list_.AwakeForStateChange(new_state); | 149 awakable_list_.AwakeForStateChange(new_state); |
165 return MOJO_RESULT_OK; | |
166 } | |
167 | 150 |
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | 151 base::AutoUnlock unlock(lock_); |
169 void** buffer, | 152 NotifyWrite(num_bytes_to_write); |
170 uint32_t* buffer_num_bytes, | |
171 MojoWriteDataFlags flags) { | |
172 lock().AssertAcquired(); | |
173 if (InTwoPhaseWrite()) | |
174 return MOJO_RESULT_BUSY; | |
175 if (error_) | |
176 return MOJO_RESULT_FAILED_PRECONDITION; | |
177 | |
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | |
179 if (*buffer_num_bytes == 0) | |
180 *buffer_num_bytes = options_.capacity_num_bytes; | |
181 | |
182 two_phase_data_.resize(*buffer_num_bytes); | |
183 *buffer = &two_phase_data_[0]; | |
184 | |
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | |
186 // we can construct a MessageInTransit here. But then we need to make | |
187 // MessageInTransit support changing its data size later. | |
188 | 153 |
189 return MOJO_RESULT_OK; | 154 return MOJO_RESULT_OK; |
190 } | 155 } |
191 | 156 |
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | 157 MojoResult DataPipeProducerDispatcher::BeginWriteData( |
| 158 void** buffer, |
| 159 uint32_t* buffer_num_bytes, |
| 160 MojoWriteDataFlags flags) { |
| 161 base::AutoLock lock(lock_); |
| 162 if (!shared_ring_buffer_ || in_transit_) |
| 163 return MOJO_RESULT_INVALID_ARGUMENT; |
| 164 if (in_two_phase_write_) |
| 165 return MOJO_RESULT_BUSY; |
| 166 if (peer_closed_) |
| 167 return MOJO_RESULT_FAILED_PRECONDITION; |
| 168 |
| 169 if (available_capacity_ == 0) { |
| 170 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| 171 : MOJO_RESULT_SHOULD_WAIT; |
| 172 } |
| 173 |
| 174 in_two_phase_write_ = true; |
| 175 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_, |
| 176 available_capacity_); |
| 177 DCHECK_GT(*buffer_num_bytes, 0u); |
| 178 |
| 179 CHECK(ring_buffer_mapping_); |
| 180 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); |
| 181 *buffer = data + write_offset_; |
| 182 |
| 183 return MOJO_RESULT_OK; |
| 184 } |
| 185 |
| 186 MojoResult DataPipeProducerDispatcher::EndWriteData( |
193 uint32_t num_bytes_written) { | 187 uint32_t num_bytes_written) { |
194 lock().AssertAcquired(); | 188 base::AutoLock lock(lock_); |
195 if (!InTwoPhaseWrite()) | 189 if (is_closed_ || in_transit_) |
| 190 return MOJO_RESULT_INVALID_ARGUMENT; |
| 191 |
| 192 if (!in_two_phase_write_) |
196 return MOJO_RESULT_FAILED_PRECONDITION; | 193 return MOJO_RESULT_FAILED_PRECONDITION; |
197 | 194 |
| 195 DCHECK(shared_ring_buffer_); |
| 196 DCHECK(ring_buffer_mapping_); |
| 197 |
198 // Note: Allow successful completion of the two-phase write even if the other | 198 // Note: Allow successful completion of the two-phase write even if the other |
199 // side has been closed. | 199 // side has been closed. |
200 MojoResult rv = MOJO_RESULT_OK; | 200 MojoResult rv = MOJO_RESULT_OK; |
201 if (num_bytes_written > two_phase_data_.size() || | 201 if (num_bytes_written > available_capacity_ || |
202 num_bytes_written % options_.element_num_bytes != 0) { | 202 num_bytes_written % options_.element_num_bytes != 0 || |
| 203 write_offset_ + num_bytes_written > options_.capacity_num_bytes) { |
203 rv = MOJO_RESULT_INVALID_ARGUMENT; | 204 rv = MOJO_RESULT_INVALID_ARGUMENT; |
204 } else if (channel_) { | 205 } else { |
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); | 206 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes); |
| 207 available_capacity_ -= num_bytes_written; |
| 208 write_offset_ = (write_offset_ + num_bytes_written) % |
| 209 options_.capacity_num_bytes; |
| 210 |
| 211 base::AutoUnlock unlock(lock_); |
| 212 NotifyWrite(num_bytes_written); |
206 } | 213 } |
207 | 214 |
208 // Two-phase write ended even on failure. | 215 in_two_phase_write_ = false; |
209 two_phase_data_.clear(); | 216 |
210 // If we're now writable, we *became* writable (since we weren't writable | 217 // If we're now writable, we *became* writable (since we weren't writable |
211 // during the two-phase write), so awake producer awakables. | 218 // during the two-phase write), so awake producer awakables. |
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 219 HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | 220 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
214 awakable_list_.AwakeForStateChange(new_state); | 221 awakable_list_.AwakeForStateChange(new_state); |
215 | 222 |
216 return rv; | 223 return rv; |
217 } | 224 } |
218 | 225 |
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | 226 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
220 const { | 227 base::AutoLock lock(lock_); |
221 lock().AssertAcquired(); | 228 return GetHandleSignalsStateNoLock(); |
222 | |
223 HandleSignalsState rv; | |
224 if (!error_) { | |
225 if (!InTwoPhaseWrite()) | |
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
228 } else { | |
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
230 } | |
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
232 return rv; | |
233 } | 229 } |
234 | 230 |
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | 231 MojoResult DataPipeProducerDispatcher::AddAwakable( |
236 Awakable* awakable, | 232 Awakable* awakable, |
237 MojoHandleSignals signals, | 233 MojoHandleSignals signals, |
238 uintptr_t context, | 234 uintptr_t context, |
239 HandleSignalsState* signals_state) { | 235 HandleSignalsState* signals_state) { |
240 lock().AssertAcquired(); | 236 base::AutoLock lock(lock_); |
241 if (channel_) | 237 if (!shared_ring_buffer_ || in_transit_) { |
242 channel_->EnsureLazyInitialized(); | 238 if (signals_state) |
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 239 *signals_state = HandleSignalsState(); |
| 240 return MOJO_RESULT_INVALID_ARGUMENT; |
| 241 } |
| 242 UpdateSignalsStateNoLock(); |
| 243 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
244 if (state.satisfies(signals)) { | 244 if (state.satisfies(signals)) { |
245 if (signals_state) | 245 if (signals_state) |
246 *signals_state = state; | 246 *signals_state = state; |
247 return MOJO_RESULT_ALREADY_EXISTS; | 247 return MOJO_RESULT_ALREADY_EXISTS; |
248 } | 248 } |
249 if (!state.can_satisfy(signals)) { | 249 if (!state.can_satisfy(signals)) { |
250 if (signals_state) | 250 if (signals_state) |
251 *signals_state = state; | 251 *signals_state = state; |
252 return MOJO_RESULT_FAILED_PRECONDITION; | 252 return MOJO_RESULT_FAILED_PRECONDITION; |
253 } | 253 } |
254 | 254 |
255 awakable_list_.Add(awakable, signals, context); | 255 awakable_list_.Add(awakable, signals, context); |
256 return MOJO_RESULT_OK; | 256 return MOJO_RESULT_OK; |
257 } | 257 } |
258 | 258 |
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | 259 void DataPipeProducerDispatcher::RemoveAwakable( |
260 Awakable* awakable, | 260 Awakable* awakable, |
261 HandleSignalsState* signals_state) { | 261 HandleSignalsState* signals_state) { |
262 lock().AssertAcquired(); | 262 base::AutoLock lock(lock_); |
| 263 if ((!shared_ring_buffer_ || in_transit_) && signals_state) |
| 264 *signals_state = HandleSignalsState(); |
| 265 else if (signals_state) |
| 266 *signals_state = GetHandleSignalsStateNoLock(); |
263 awakable_list_.Remove(awakable); | 267 awakable_list_.Remove(awakable); |
264 if (signals_state) | 268 } |
265 *signals_state = GetHandleSignalsStateImplNoLock(); | 269 |
266 } | 270 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes, |
267 | 271 uint32_t* num_ports, |
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | 272 uint32_t* num_handles) { |
269 size_t* max_size, | 273 base::AutoLock lock(lock_); |
270 size_t* max_platform_handles) { | 274 DCHECK(in_transit_); |
271 if (!serialized_) | 275 *num_bytes = sizeof(SerializedState); |
272 SerializeInternal(); | 276 *num_ports = 1; |
273 | 277 *num_handles = 1; |
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | 278 } |
275 !serialized_write_buffer_.empty(), max_size, | 279 |
276 max_platform_handles); | 280 bool DataPipeProducerDispatcher::EndSerialize( |
277 } | |
278 | |
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | |
280 void* destination, | 281 void* destination, |
281 size_t* actual_size, | 282 ports::PortName* ports, |
282 PlatformHandleVector* platform_handles) { | 283 PlatformHandle* platform_handles) { |
283 ScopedPlatformHandle shared_memory_handle; | 284 SerializedState* state = static_cast<SerializedState*>(destination); |
284 size_t shared_memory_size = serialized_write_buffer_.size(); | 285 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); |
285 if (shared_memory_size) { | 286 |
286 scoped_refptr<PlatformSharedBuffer> shared_buffer( | 287 base::AutoLock lock(lock_); |
287 internal::g_platform_support->CreateSharedBuffer( | 288 DCHECK(in_transit_); |
288 shared_memory_size)); | 289 state->pipe_id = pipe_id_; |
289 scoped_ptr<PlatformSharedBufferMapping> mapping( | 290 state->peer_closed = peer_closed_; |
290 shared_buffer->Map(0, shared_memory_size)); | 291 state->write_offset = write_offset_; |
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0], | 292 state->available_capacity = available_capacity_; |
292 shared_memory_size); | 293 |
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | 294 ports[0] = control_port_.name(); |
294 } | 295 |
295 | 296 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); |
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | 297 platform_handles[0] = buffer_handle_for_transit_.get(); |
297 std::move(shared_memory_handle), shared_memory_size, | 298 |
298 destination, actual_size, platform_handles); | |
299 CloseImplNoLock(); | |
300 return true; | 299 return true; |
301 } | 300 } |
302 | 301 |
303 void DataPipeProducerDispatcher::TransportStarted() { | 302 bool DataPipeProducerDispatcher::BeginTransit() { |
304 started_transport_.Acquire(); | 303 base::AutoLock lock(lock_); |
305 } | 304 if (in_transit_) |
306 | 305 return false; |
307 void DataPipeProducerDispatcher::TransportEnded() { | 306 in_transit_ = !in_two_phase_write_; |
308 started_transport_.Release(); | 307 return in_transit_; |
309 } | 308 } |
310 | 309 |
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | 310 void DataPipeProducerDispatcher::CompleteTransitAndClose() { |
312 lock().AssertAcquired(); | 311 node_controller_->SetPortObserver(control_port_, nullptr); |
313 return InTwoPhaseWrite(); | 312 |
314 } | 313 base::AutoLock lock(lock_); |
315 | 314 DCHECK(in_transit_); |
316 void DataPipeProducerDispatcher::OnReadMessage( | 315 transferred_ = true; |
317 const MessageInTransit::View& message_view, | 316 in_transit_ = false; |
318 ScopedPlatformHandleVectorPtr platform_handles) { | 317 ignore_result(buffer_handle_for_transit_.release()); |
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; | 318 CloseNoLock(); |
320 } | 319 } |
321 | 320 |
322 void DataPipeProducerDispatcher::OnError(Error error) { | 321 void DataPipeProducerDispatcher::CancelTransit() { |
323 switch (error) { | 322 base::AutoLock lock(lock_); |
324 case ERROR_READ_BROKEN: | 323 DCHECK(in_transit_); |
325 case ERROR_READ_BAD_MESSAGE: | 324 in_transit_ = false; |
326 case ERROR_READ_UNKNOWN: | 325 buffer_handle_for_transit_.reset(); |
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; | 326 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
328 break; | 327 } |
329 case ERROR_READ_SHUTDOWN: | 328 |
330 // The other side was cleanly closed, so this isn't actually an error. | 329 // static |
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; | 330 scoped_refptr<DataPipeProducerDispatcher> |
332 break; | 331 DataPipeProducerDispatcher::Deserialize(const void* data, |
333 case ERROR_WRITE: | 332 size_t num_bytes, |
334 // Write errors are slightly notable: they probably shouldn't happen under | 333 const ports::PortName* ports, |
335 // normal operation (but maybe the other side crashed). | 334 size_t num_ports, |
336 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | 335 PlatformHandle* handles, |
337 break; | 336 size_t num_handles) { |
338 } | 337 if (num_ports != 1 || num_handles != 1 || |
339 | 338 num_bytes != sizeof(SerializedState)) { |
340 error_ = true; | 339 return nullptr; |
341 if (started_transport_.Try()) { | 340 } |
342 base::AutoLock locker(lock()); | 341 |
343 // We can get two OnError callbacks before the post task below completes. | 342 const SerializedState* state = static_cast<const SerializedState*>(data); |
344 // Although RawChannel still has a pointer to this object until Shutdown is | 343 |
345 // called, that is safe since this class always does a PostTask to the IO | 344 NodeController* node_controller = internal::g_core->GetNodeController(); |
346 // thread to self destruct. | 345 ports::PortRef port; |
347 if (channel_) { | 346 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) |
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 347 return nullptr; |
349 channel_->Shutdown(); | 348 |
350 channel_ = nullptr; | 349 PlatformHandle buffer_handle; |
| 350 std::swap(buffer_handle, handles[0]); |
| 351 scoped_refptr<PlatformSharedBuffer> ring_buffer = |
| 352 internal::g_platform_support->CreateSharedBufferFromHandle( |
| 353 state->options.capacity_num_bytes, |
| 354 ScopedPlatformHandle(buffer_handle)); |
| 355 if (!ring_buffer) { |
| 356 DLOG(ERROR) << "Failed to deserialize shared buffer handle."; |
| 357 return nullptr; |
| 358 } |
| 359 |
| 360 scoped_refptr<DataPipeProducerDispatcher> dispatcher = |
| 361 new DataPipeProducerDispatcher(node_controller, port, ring_buffer, |
| 362 state->options, false /* initialized */, |
| 363 state->pipe_id); |
| 364 |
| 365 { |
| 366 base::AutoLock lock(dispatcher->lock_); |
| 367 dispatcher->peer_closed_ = state->peer_closed; |
| 368 dispatcher->write_offset_ = state->write_offset; |
| 369 dispatcher->available_capacity_ = state->available_capacity; |
| 370 dispatcher->InitializeNoLock(); |
| 371 } |
| 372 |
| 373 return dispatcher; |
| 374 } |
| 375 |
| 376 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
| 377 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ && |
| 378 !ring_buffer_mapping_); |
| 379 } |
| 380 |
| 381 void DataPipeProducerDispatcher::InitializeNoLock() { |
| 382 lock_.AssertAcquired(); |
| 383 |
| 384 if (shared_ring_buffer_) { |
| 385 ring_buffer_mapping_ = |
| 386 shared_ring_buffer_->Map(0, options_.capacity_num_bytes); |
| 387 if (!ring_buffer_mapping_) { |
| 388 DLOG(ERROR) << "Failed to map shared buffer."; |
| 389 shared_ring_buffer_ = nullptr; |
351 } | 390 } |
352 started_transport_.Release(); | 391 } |
| 392 |
| 393 base::AutoUnlock unlock(lock_); |
| 394 node_controller_->SetPortObserver( |
| 395 control_port_, |
| 396 make_scoped_refptr(new PortObserverThunk(this))); |
| 397 } |
| 398 |
| 399 MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
| 400 lock_.AssertAcquired(); |
| 401 if (is_closed_ || in_transit_) |
| 402 return MOJO_RESULT_INVALID_ARGUMENT; |
| 403 is_closed_ = true; |
| 404 ring_buffer_mapping_.reset(); |
| 405 shared_ring_buffer_ = nullptr; |
| 406 |
| 407 awakable_list_.CancelAll(); |
| 408 if (!transferred_) { |
| 409 base::AutoUnlock unlock(lock_); |
| 410 node_controller_->ClosePort(control_port_); |
| 411 } |
| 412 |
| 413 return MOJO_RESULT_OK; |
| 414 } |
| 415 |
| 416 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
| 417 const { |
| 418 lock_.AssertAcquired(); |
| 419 HandleSignalsState rv; |
| 420 if (!peer_closed_) { |
| 421 if (!in_two_phase_write_ && shared_ring_buffer_ && |
| 422 available_capacity_ > 0) |
| 423 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 424 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
353 } else { | 425 } else { |
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 426 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
355 } | 427 } |
356 } | 428 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
357 | 429 return rv; |
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | 430 } |
359 return !two_phase_data_.empty(); | 431 |
360 } | 432 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) { |
361 | 433 DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: " |
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | 434 << num_bytes << " bytes written. [control_port=" |
363 const void* elements, | 435 << control_port_.name() << "]"; |
364 uint32_t num_bytes) { | 436 |
365 // The maximum amount of data to send per message (make it a multiple of the | 437 SendDataPipeControlMessage(node_controller_, control_port_, |
366 // element size. | 438 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes); |
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 439 } |
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | 440 |
369 DCHECK_GT(max_message_num_bytes, 0u); | 441 void DataPipeProducerDispatcher::OnPortStatusChanged() { |
370 | 442 base::AutoLock lock(lock_); |
371 uint32_t offset = 0; | 443 |
372 while (offset < num_bytes) { | 444 // We stop observing the control port as soon it's transferred, but this can |
373 uint32_t message_num_bytes = | 445 // race with events which are raised right before that happens. This is fine |
374 std::min(static_cast<uint32_t>(max_message_num_bytes), | 446 // to ignore. |
375 num_bytes - offset); | 447 if (transferred_) |
376 scoped_ptr<MessageInTransit> message(new MessageInTransit( | 448 return; |
377 MessageInTransit::Type::MESSAGE, message_num_bytes, | 449 |
378 static_cast<const char*>(elements) + offset)); | 450 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; |
379 if (!channel_->WriteMessage(std::move(message))) { | 451 |
380 error_ = true; | 452 UpdateSignalsStateNoLock(); |
381 return false; | 453 } |
382 } | 454 |
383 | 455 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { |
384 offset += message_num_bytes; | 456 lock_.AssertAcquired(); |
385 } | 457 |
386 | 458 bool was_peer_closed = peer_closed_; |
387 return true; | 459 size_t previous_capacity = available_capacity_; |
388 } | 460 |
389 | 461 ports::PortStatus port_status; |
390 void DataPipeProducerDispatcher::SerializeInternal() { | 462 if (node_controller_->node()->GetStatus(control_port_, &port_status) != |
391 // We need to stop watching handle immediately, even though not on IO thread, | 463 ports::OK || |
392 // so that other messages aren't read after this. | 464 !port_status.receiving_messages) { |
393 if (channel_) { | 465 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" |
394 std::vector<char> serialized_read_buffer; | 466 << " [control_port=" << control_port_.name() << "]"; |
395 std::vector<int> fds; | 467 |
396 bool write_error = false; | 468 peer_closed_ = true; |
397 serialized_platform_handle_ = channel_->ReleaseHandle( | 469 } |
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, | 470 |
399 &write_error); | 471 if (port_status.has_messages && !in_transit_) { |
400 CHECK(serialized_read_buffer.empty()); | 472 ports::ScopedMessage message; |
401 CHECK(fds.empty()); | 473 do { |
402 if (write_error) | 474 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, |
403 serialized_platform_handle_.reset(); | 475 &message); |
404 channel_ = nullptr; | 476 if (rv != ports::OK) |
405 } | 477 peer_closed_ = true; |
406 serialized_ = true; | 478 if (message) { |
| 479 PortsMessage* ports_message = static_cast<PortsMessage*>(message.get()); |
| 480 const DataPipeControlMessage* m = |
| 481 static_cast<const DataPipeControlMessage*>( |
| 482 ports_message->payload_bytes()); |
| 483 |
| 484 if (m->command != DataPipeCommand::DATA_WAS_READ) { |
| 485 DLOG(ERROR) << "Unexpected message from consumer."; |
| 486 peer_closed_ = true; |
| 487 break; |
| 488 } |
| 489 |
| 490 if (static_cast<size_t>(available_capacity_) + m->num_bytes > |
| 491 options_.capacity_num_bytes) { |
| 492 DLOG(ERROR) << "Consumer claims to have read too many bytes."; |
| 493 break; |
| 494 } |
| 495 |
| 496 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that " |
| 497 << m->num_bytes << " bytes were read. [control_port=" |
| 498 << control_port_.name() << "]"; |
| 499 |
| 500 available_capacity_ += m->num_bytes; |
| 501 } |
| 502 } while (message); |
| 503 } |
| 504 |
| 505 if (peer_closed_ != was_peer_closed || |
| 506 available_capacity_ != previous_capacity) { |
| 507 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 508 } |
407 } | 509 } |
408 | 510 |
409 } // namespace edk | 511 } // namespace edk |
410 } // namespace mojo | 512 } // namespace mojo |
OLD | NEW |