Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(203)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
14 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h" 16 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_shared_buffer.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h"
17 #include "mojo/edk/embedder/platform_support.h" 18 #include "mojo/edk/embedder/platform_support.h"
18 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/data_pipe.h" 20 #include "mojo/edk/system/core.h"
21 #include "mojo/edk/system/data_pipe_control_message.h"
22 #include "mojo/edk/system/node_controller.h"
23 #include "mojo/edk/system/ports_message.h"
20 24
21 namespace mojo { 25 namespace mojo {
22 namespace edk { 26 namespace edk {
23 27
24 void DataPipeProducerDispatcher::Init( 28 namespace {
25 ScopedPlatformHandle message_pipe,
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) {
27 if (message_pipe.is_valid()) {
28 channel_ = RawChannel::Create(std::move(message_pipe));
29 channel_->SetSerializedData(
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
31 nullptr, nullptr);
32 internal::g_io_thread_task_runner->PostTask(
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
34 } else {
35 error_ = true;
36 }
37 }
38 29
39 void DataPipeProducerDispatcher::InitOnIO() { 30 struct SerializedState {
40 base::AutoLock locker(lock()); 31 MojoCreateDataPipeOptions options;
41 if (channel_) 32 uint64_t pipe_id;
42 channel_->Init(this); 33 bool peer_closed;
43 } 34 uint32_t write_offset;
35 uint32_t available_capacity;
36 };
44 37
45 void DataPipeProducerDispatcher::CloseOnIO() { 38 } // namespace
46 base::AutoLock locker(lock()); 39
47 if (channel_) { 40 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
48 channel_->Shutdown(); 41 // reference to the dispatcher to ensure it lives as long as the observed port.
49 channel_ = nullptr; 42 class DataPipeProducerDispatcher::PortObserverThunk
43 : public NodeController::PortObserver {
44 public:
45 explicit PortObserverThunk(
46 scoped_refptr<DataPipeProducerDispatcher> dispatcher)
47 : dispatcher_(dispatcher) {}
48
49 private:
50 ~PortObserverThunk() override {}
51
52 // NodeController::PortObserver:
53 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
54
55 scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
56
57 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
58 };
59
60 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
61 NodeController* node_controller,
62 const ports::PortRef& control_port,
63 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
64 const MojoCreateDataPipeOptions& options,
65 bool initialized,
66 uint64_t pipe_id)
67 : options_(options),
68 node_controller_(node_controller),
69 control_port_(control_port),
70 pipe_id_(pipe_id),
71 shared_ring_buffer_(shared_ring_buffer),
72 available_capacity_(options_.capacity_num_bytes) {
73 if (initialized) {
74 base::AutoLock lock(lock_);
75 InitializeNoLock();
50 } 76 }
51 } 77 }
52 78
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 79 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
54 return Type::DATA_PIPE_PRODUCER; 80 return Type::DATA_PIPE_PRODUCER;
55 } 81 }
56 82
57 scoped_refptr<DataPipeProducerDispatcher> 83 MojoResult DataPipeProducerDispatcher::Close() {
58 DataPipeProducerDispatcher::Deserialize( 84 base::AutoLock lock(lock_);
59 const void* source, 85 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
60 size_t size, 86 return CloseNoLock();
61 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options;
63 ScopedPlatformHandle shared_memory_handle;
64 size_t shared_memory_size = 0;
65 ScopedPlatformHandle platform_handle =
66 DataPipe::Deserialize(source, size, platform_handles, &options,
67 &shared_memory_handle, &shared_memory_size);
68
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
70
71 char* serialized_write_buffer = nullptr;
72 size_t serialized_write_buffer_size = 0;
73 scoped_refptr<PlatformSharedBuffer> shared_buffer;
74 scoped_ptr<PlatformSharedBufferMapping> mapping;
75 if (shared_memory_size) {
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
77 shared_memory_size, std::move(shared_memory_handle));
78 mapping = shared_buffer->Map(0, shared_memory_size);
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase());
80 serialized_write_buffer_size = shared_memory_size;
81 }
82
83 rv->Init(std::move(platform_handle), serialized_write_buffer,
84 serialized_write_buffer_size);
85 return rv;
86 } 87 }
87 88
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 89 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
89 const MojoCreateDataPipeOptions& options) 90 uint32_t* num_bytes,
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { 91 MojoWriteDataFlags flags) {
91 } 92 base::AutoLock lock(lock_);
93 if (!shared_ring_buffer_ || in_transit_)
94 return MOJO_RESULT_INVALID_ARGUMENT;
92 95
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 96 if (in_two_phase_write_)
94 // See comment in ~MessagePipeDispatcher. 97 return MOJO_RESULT_BUSY;
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
96 channel_->Shutdown();
97 else
98 DCHECK(!channel_);
99 }
100 98
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { 99 if (peer_closed_)
102 lock().AssertAcquired(); 100 return MOJO_RESULT_FAILED_PRECONDITION;
103 awakable_list_.CancelAll();
104 }
105 101
106 void DataPipeProducerDispatcher::CloseImplNoLock() {
107 lock().AssertAcquired();
108 internal::g_io_thread_task_runner->PostTask(
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
110 }
111
112 scoped_refptr<Dispatcher>
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
114 lock().AssertAcquired();
115
116 SerializeInternal();
117
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_);
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
121 rv->serialized_ = true;
122 return scoped_refptr<Dispatcher>(rv.get());
123 }
124
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
126 const void* elements,
127 uint32_t* num_bytes,
128 MojoWriteDataFlags flags) {
129 lock().AssertAcquired();
130 if (InTwoPhaseWrite())
131 return MOJO_RESULT_BUSY;
132 if (error_)
133 return MOJO_RESULT_FAILED_PRECONDITION;
134 if (*num_bytes % options_.element_num_bytes != 0) 102 if (*num_bytes % options_.element_num_bytes != 0)
135 return MOJO_RESULT_INVALID_ARGUMENT; 103 return MOJO_RESULT_INVALID_ARGUMENT;
136 if (*num_bytes == 0) 104 if (*num_bytes == 0)
137 return MOJO_RESULT_OK; // Nothing to do. 105 return MOJO_RESULT_OK; // Nothing to do.
138 106
139 // For now, we ignore options.capacity_num_bytes as a total of all pending
140 // writes (and just treat it per message). We will implement that later if
141 // we need to. All current uses want all their data to be sent, and it's not
142 // clear that this backpressure should be done at the mojo layer or at a
143 // higher application layer.
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 107 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; 108 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { 109 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
147 // Don't return "should wait" since you can't wait for a specified amount of 110 // Don't return "should wait" since you can't wait for a specified amount of
148 // data. 111 // data.
149 return MOJO_RESULT_OUT_OF_RANGE; 112 return MOJO_RESULT_OUT_OF_RANGE;
150 } 113 }
151 114
152 uint32_t num_bytes_to_write = 115 DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
153 std::min(*num_bytes, options_.capacity_num_bytes); 116 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
154 if (num_bytes_to_write == 0) 117 if (num_bytes_to_write == 0)
155 return MOJO_RESULT_SHOULD_WAIT; 118 return MOJO_RESULT_SHOULD_WAIT;
156 119
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 120 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
158 121
159 *num_bytes = num_bytes_to_write; 122 *num_bytes = num_bytes_to_write;
160 WriteDataIntoMessages(elements, num_bytes_to_write);
161 123
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 124 CHECK(ring_buffer_mapping_);
125 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
126 CHECK(data);
127
128 const uint8_t* source = static_cast<const uint8_t*>(elements);
129 CHECK(source);
130
131 DCHECK_LE(write_offset_, options_.capacity_num_bytes);
132 uint32_t tail_bytes_to_write =
133 std::min(options_.capacity_num_bytes - write_offset_,
134 num_bytes_to_write);
135 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
136
137 DCHECK_GT(tail_bytes_to_write, 0u);
138 memcpy(data + write_offset_, source, tail_bytes_to_write);
139 if (head_bytes_to_write > 0)
140 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
141
142 DCHECK_LE(num_bytes_to_write, available_capacity_);
143 available_capacity_ -= num_bytes_to_write;
144 write_offset_ = (write_offset_ + num_bytes_to_write) %
145 options_.capacity_num_bytes;
146
147 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
163 if (!new_state.equals(old_state)) 148 if (!new_state.equals(old_state))
164 awakable_list_.AwakeForStateChange(new_state); 149 awakable_list_.AwakeForStateChange(new_state);
165 return MOJO_RESULT_OK;
166 }
167 150
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( 151 base::AutoUnlock unlock(lock_);
169 void** buffer, 152 NotifyWrite(num_bytes_to_write);
170 uint32_t* buffer_num_bytes,
171 MojoWriteDataFlags flags) {
172 lock().AssertAcquired();
173 if (InTwoPhaseWrite())
174 return MOJO_RESULT_BUSY;
175 if (error_)
176 return MOJO_RESULT_FAILED_PRECONDITION;
177
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
179 if (*buffer_num_bytes == 0)
180 *buffer_num_bytes = options_.capacity_num_bytes;
181
182 two_phase_data_.resize(*buffer_num_bytes);
183 *buffer = &two_phase_data_[0];
184
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
186 // we can construct a MessageInTransit here. But then we need to make
187 // MessageInTransit support changing its data size later.
188 153
189 return MOJO_RESULT_OK; 154 return MOJO_RESULT_OK;
190 } 155 }
191 156
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( 157 MojoResult DataPipeProducerDispatcher::BeginWriteData(
158 void** buffer,
159 uint32_t* buffer_num_bytes,
160 MojoWriteDataFlags flags) {
161 base::AutoLock lock(lock_);
162 if (!shared_ring_buffer_ || in_transit_)
163 return MOJO_RESULT_INVALID_ARGUMENT;
164 if (in_two_phase_write_)
165 return MOJO_RESULT_BUSY;
166 if (peer_closed_)
167 return MOJO_RESULT_FAILED_PRECONDITION;
168
169 if (available_capacity_ == 0) {
170 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
171 : MOJO_RESULT_SHOULD_WAIT;
172 }
173
174 in_two_phase_write_ = true;
175 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
176 available_capacity_);
177 DCHECK_GT(*buffer_num_bytes, 0u);
178
179 CHECK(ring_buffer_mapping_);
180 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
181 *buffer = data + write_offset_;
182
183 return MOJO_RESULT_OK;
184 }
185
186 MojoResult DataPipeProducerDispatcher::EndWriteData(
193 uint32_t num_bytes_written) { 187 uint32_t num_bytes_written) {
194 lock().AssertAcquired(); 188 base::AutoLock lock(lock_);
195 if (!InTwoPhaseWrite()) 189 if (is_closed_ || in_transit_)
190 return MOJO_RESULT_INVALID_ARGUMENT;
191
192 if (!in_two_phase_write_)
196 return MOJO_RESULT_FAILED_PRECONDITION; 193 return MOJO_RESULT_FAILED_PRECONDITION;
197 194
195 DCHECK(shared_ring_buffer_);
196 DCHECK(ring_buffer_mapping_);
197
198 // Note: Allow successful completion of the two-phase write even if the other 198 // Note: Allow successful completion of the two-phase write even if the other
199 // side has been closed. 199 // side has been closed.
200 MojoResult rv = MOJO_RESULT_OK; 200 MojoResult rv = MOJO_RESULT_OK;
201 if (num_bytes_written > two_phase_data_.size() || 201 if (num_bytes_written > available_capacity_ ||
202 num_bytes_written % options_.element_num_bytes != 0) { 202 num_bytes_written % options_.element_num_bytes != 0 ||
203 write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
203 rv = MOJO_RESULT_INVALID_ARGUMENT; 204 rv = MOJO_RESULT_INVALID_ARGUMENT;
204 } else if (channel_) { 205 } else {
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); 206 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
207 available_capacity_ -= num_bytes_written;
208 write_offset_ = (write_offset_ + num_bytes_written) %
209 options_.capacity_num_bytes;
210
211 base::AutoUnlock unlock(lock_);
212 NotifyWrite(num_bytes_written);
206 } 213 }
207 214
208 // Two-phase write ended even on failure. 215 in_two_phase_write_ = false;
209 two_phase_data_.clear(); 216
210 // If we're now writable, we *became* writable (since we weren't writable 217 // If we're now writable, we *became* writable (since we weren't writable
211 // during the two-phase write), so awake producer awakables. 218 // during the two-phase write), so awake producer awakables.
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 219 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 220 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
214 awakable_list_.AwakeForStateChange(new_state); 221 awakable_list_.AwakeForStateChange(new_state);
215 222
216 return rv; 223 return rv;
217 } 224 }
218 225
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() 226 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
220 const { 227 base::AutoLock lock(lock_);
221 lock().AssertAcquired(); 228 return GetHandleSignalsStateNoLock();
222
223 HandleSignalsState rv;
224 if (!error_) {
225 if (!InTwoPhaseWrite())
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
228 } else {
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
230 }
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
232 return rv;
233 } 229 }
234 230
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( 231 MojoResult DataPipeProducerDispatcher::AddAwakable(
236 Awakable* awakable, 232 Awakable* awakable,
237 MojoHandleSignals signals, 233 MojoHandleSignals signals,
238 uintptr_t context, 234 uintptr_t context,
239 HandleSignalsState* signals_state) { 235 HandleSignalsState* signals_state) {
240 lock().AssertAcquired(); 236 base::AutoLock lock(lock_);
241 if (channel_) 237 if (!shared_ring_buffer_ || in_transit_) {
242 channel_->EnsureLazyInitialized(); 238 if (signals_state)
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 239 *signals_state = HandleSignalsState();
240 return MOJO_RESULT_INVALID_ARGUMENT;
241 }
242 UpdateSignalsStateNoLock();
243 HandleSignalsState state = GetHandleSignalsStateNoLock();
244 if (state.satisfies(signals)) { 244 if (state.satisfies(signals)) {
245 if (signals_state) 245 if (signals_state)
246 *signals_state = state; 246 *signals_state = state;
247 return MOJO_RESULT_ALREADY_EXISTS; 247 return MOJO_RESULT_ALREADY_EXISTS;
248 } 248 }
249 if (!state.can_satisfy(signals)) { 249 if (!state.can_satisfy(signals)) {
250 if (signals_state) 250 if (signals_state)
251 *signals_state = state; 251 *signals_state = state;
252 return MOJO_RESULT_FAILED_PRECONDITION; 252 return MOJO_RESULT_FAILED_PRECONDITION;
253 } 253 }
254 254
255 awakable_list_.Add(awakable, signals, context); 255 awakable_list_.Add(awakable, signals, context);
256 return MOJO_RESULT_OK; 256 return MOJO_RESULT_OK;
257 } 257 }
258 258
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( 259 void DataPipeProducerDispatcher::RemoveAwakable(
260 Awakable* awakable, 260 Awakable* awakable,
261 HandleSignalsState* signals_state) { 261 HandleSignalsState* signals_state) {
262 lock().AssertAcquired(); 262 base::AutoLock lock(lock_);
263 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
264 *signals_state = HandleSignalsState();
265 else if (signals_state)
266 *signals_state = GetHandleSignalsStateNoLock();
263 awakable_list_.Remove(awakable); 267 awakable_list_.Remove(awakable);
264 if (signals_state) 268 }
265 *signals_state = GetHandleSignalsStateImplNoLock(); 269
266 } 270 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
267 271 uint32_t* num_ports,
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( 272 uint32_t* num_handles) {
269 size_t* max_size, 273 base::AutoLock lock(lock_);
270 size_t* max_platform_handles) { 274 DCHECK(in_transit_);
271 if (!serialized_) 275 *num_bytes = sizeof(SerializedState);
272 SerializeInternal(); 276 *num_ports = 1;
273 277 *num_handles = 1;
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), 278 }
275 !serialized_write_buffer_.empty(), max_size, 279
276 max_platform_handles); 280 bool DataPipeProducerDispatcher::EndSerialize(
277 }
278
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
280 void* destination, 281 void* destination,
281 size_t* actual_size, 282 ports::PortName* ports,
282 PlatformHandleVector* platform_handles) { 283 PlatformHandle* platform_handles) {
283 ScopedPlatformHandle shared_memory_handle; 284 SerializedState* state = static_cast<SerializedState*>(destination);
284 size_t shared_memory_size = serialized_write_buffer_.size(); 285 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
285 if (shared_memory_size) { 286
286 scoped_refptr<PlatformSharedBuffer> shared_buffer( 287 base::AutoLock lock(lock_);
287 internal::g_platform_support->CreateSharedBuffer( 288 DCHECK(in_transit_);
288 shared_memory_size)); 289 state->pipe_id = pipe_id_;
289 scoped_ptr<PlatformSharedBufferMapping> mapping( 290 state->peer_closed = peer_closed_;
290 shared_buffer->Map(0, shared_memory_size)); 291 state->write_offset = write_offset_;
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0], 292 state->available_capacity = available_capacity_;
292 shared_memory_size); 293
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); 294 ports[0] = control_port_.name();
294 } 295
295 296 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), 297 platform_handles[0] = buffer_handle_for_transit_.get();
297 std::move(shared_memory_handle), shared_memory_size, 298
298 destination, actual_size, platform_handles);
299 CloseImplNoLock();
300 return true; 299 return true;
301 } 300 }
302 301
303 void DataPipeProducerDispatcher::TransportStarted() { 302 bool DataPipeProducerDispatcher::BeginTransit() {
304 started_transport_.Acquire(); 303 base::AutoLock lock(lock_);
305 } 304 if (in_transit_)
306 305 return false;
307 void DataPipeProducerDispatcher::TransportEnded() { 306 in_transit_ = !in_two_phase_write_;
308 started_transport_.Release(); 307 return in_transit_;
309 } 308 }
310 309
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { 310 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
312 lock().AssertAcquired(); 311 node_controller_->SetPortObserver(control_port_, nullptr);
313 return InTwoPhaseWrite(); 312
314 } 313 base::AutoLock lock(lock_);
315 314 DCHECK(in_transit_);
316 void DataPipeProducerDispatcher::OnReadMessage( 315 transferred_ = true;
317 const MessageInTransit::View& message_view, 316 in_transit_ = false;
318 ScopedPlatformHandleVectorPtr platform_handles) { 317 ignore_result(buffer_handle_for_transit_.release());
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; 318 CloseNoLock();
320 } 319 }
321 320
322 void DataPipeProducerDispatcher::OnError(Error error) { 321 void DataPipeProducerDispatcher::CancelTransit() {
323 switch (error) { 322 base::AutoLock lock(lock_);
324 case ERROR_READ_BROKEN: 323 DCHECK(in_transit_);
325 case ERROR_READ_BAD_MESSAGE: 324 in_transit_ = false;
326 case ERROR_READ_UNKNOWN: 325 buffer_handle_for_transit_.reset();
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; 326 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
328 break; 327 }
329 case ERROR_READ_SHUTDOWN: 328
330 // The other side was cleanly closed, so this isn't actually an error. 329 // static
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; 330 scoped_refptr<DataPipeProducerDispatcher>
332 break; 331 DataPipeProducerDispatcher::Deserialize(const void* data,
333 case ERROR_WRITE: 332 size_t num_bytes,
334 // Write errors are slightly notable: they probably shouldn't happen under 333 const ports::PortName* ports,
335 // normal operation (but maybe the other side crashed). 334 size_t num_ports,
336 LOG(WARNING) << "DataPipeProducerDispatcher write error"; 335 PlatformHandle* handles,
337 break; 336 size_t num_handles) {
338 } 337 if (num_ports != 1 || num_handles != 1 ||
339 338 num_bytes != sizeof(SerializedState)) {
340 error_ = true; 339 return nullptr;
341 if (started_transport_.Try()) { 340 }
342 base::AutoLock locker(lock()); 341
343 // We can get two OnError callbacks before the post task below completes. 342 const SerializedState* state = static_cast<const SerializedState*>(data);
344 // Although RawChannel still has a pointer to this object until Shutdown is 343
345 // called, that is safe since this class always does a PostTask to the IO 344 NodeController* node_controller = internal::g_core->GetNodeController();
346 // thread to self destruct. 345 ports::PortRef port;
347 if (channel_) { 346 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 347 return nullptr;
349 channel_->Shutdown(); 348
350 channel_ = nullptr; 349 PlatformHandle buffer_handle;
350 std::swap(buffer_handle, handles[0]);
351 scoped_refptr<PlatformSharedBuffer> ring_buffer =
352 internal::g_platform_support->CreateSharedBufferFromHandle(
353 state->options.capacity_num_bytes,
354 ScopedPlatformHandle(buffer_handle));
355 if (!ring_buffer) {
356 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
357 return nullptr;
358 }
359
360 scoped_refptr<DataPipeProducerDispatcher> dispatcher =
361 new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
362 state->options, false /* initialized */,
363 state->pipe_id);
364
365 {
366 base::AutoLock lock(dispatcher->lock_);
367 dispatcher->peer_closed_ = state->peer_closed;
368 dispatcher->write_offset_ = state->write_offset;
369 dispatcher->available_capacity_ = state->available_capacity;
370 dispatcher->InitializeNoLock();
371 }
372
373 return dispatcher;
374 }
375
376 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
377 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
378 !ring_buffer_mapping_);
379 }
380
381 void DataPipeProducerDispatcher::InitializeNoLock() {
382 lock_.AssertAcquired();
383
384 if (shared_ring_buffer_) {
385 ring_buffer_mapping_ =
386 shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
387 if (!ring_buffer_mapping_) {
388 DLOG(ERROR) << "Failed to map shared buffer.";
389 shared_ring_buffer_ = nullptr;
351 } 390 }
352 started_transport_.Release(); 391 }
392
393 base::AutoUnlock unlock(lock_);
394 node_controller_->SetPortObserver(
395 control_port_,
396 make_scoped_refptr(new PortObserverThunk(this)));
397 }
398
399 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
400 lock_.AssertAcquired();
401 if (is_closed_ || in_transit_)
402 return MOJO_RESULT_INVALID_ARGUMENT;
403 is_closed_ = true;
404 ring_buffer_mapping_.reset();
405 shared_ring_buffer_ = nullptr;
406
407 awakable_list_.CancelAll();
408 if (!transferred_) {
409 base::AutoUnlock unlock(lock_);
410 node_controller_->ClosePort(control_port_);
411 }
412
413 return MOJO_RESULT_OK;
414 }
415
416 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
417 const {
418 lock_.AssertAcquired();
419 HandleSignalsState rv;
420 if (!peer_closed_) {
421 if (!in_two_phase_write_ && shared_ring_buffer_ &&
422 available_capacity_ > 0)
423 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
424 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
353 } else { 425 } else {
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. 426 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
355 } 427 }
356 } 428 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
357 429 return rv;
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { 430 }
359 return !two_phase_data_.empty(); 431
360 } 432 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
361 433 DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages( 434 << num_bytes << " bytes written. [control_port="
363 const void* elements, 435 << control_port_.name() << "]";
364 uint32_t num_bytes) { 436
365 // The maximum amount of data to send per message (make it a multiple of the 437 SendDataPipeControlMessage(node_controller_, control_port_,
366 // element size. 438 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; 439 }
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; 440
369 DCHECK_GT(max_message_num_bytes, 0u); 441 void DataPipeProducerDispatcher::OnPortStatusChanged() {
370 442 base::AutoLock lock(lock_);
371 uint32_t offset = 0; 443
372 while (offset < num_bytes) { 444 // We stop observing the control port as soon it's transferred, but this can
373 uint32_t message_num_bytes = 445 // race with events which are raised right before that happens. This is fine
374 std::min(static_cast<uint32_t>(max_message_num_bytes), 446 // to ignore.
375 num_bytes - offset); 447 if (transferred_)
376 scoped_ptr<MessageInTransit> message(new MessageInTransit( 448 return;
377 MessageInTransit::Type::MESSAGE, message_num_bytes, 449
378 static_cast<const char*>(elements) + offset)); 450 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
379 if (!channel_->WriteMessage(std::move(message))) { 451
380 error_ = true; 452 UpdateSignalsStateNoLock();
381 return false; 453 }
382 } 454
383 455 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
384 offset += message_num_bytes; 456 lock_.AssertAcquired();
385 } 457
386 458 bool was_peer_closed = peer_closed_;
387 return true; 459 size_t previous_capacity = available_capacity_;
388 } 460
389 461 ports::PortStatus port_status;
390 void DataPipeProducerDispatcher::SerializeInternal() { 462 if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
391 // We need to stop watching handle immediately, even though not on IO thread, 463 ports::OK ||
392 // so that other messages aren't read after this. 464 !port_status.receiving_messages) {
393 if (channel_) { 465 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
394 std::vector<char> serialized_read_buffer; 466 << " [control_port=" << control_port_.name() << "]";
395 std::vector<int> fds; 467
396 bool write_error = false; 468 peer_closed_ = true;
397 serialized_platform_handle_ = channel_->ReleaseHandle( 469 }
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, 470
399 &write_error); 471 if (port_status.has_messages && !in_transit_) {
400 CHECK(serialized_read_buffer.empty()); 472 ports::ScopedMessage message;
401 CHECK(fds.empty()); 473 do {
402 if (write_error) 474 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
403 serialized_platform_handle_.reset(); 475 &message);
404 channel_ = nullptr; 476 if (rv != ports::OK)
405 } 477 peer_closed_ = true;
406 serialized_ = true; 478 if (message) {
479 PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
480 const DataPipeControlMessage* m =
481 static_cast<const DataPipeControlMessage*>(
482 ports_message->payload_bytes());
483
484 if (m->command != DataPipeCommand::DATA_WAS_READ) {
485 DLOG(ERROR) << "Unexpected message from consumer.";
486 peer_closed_ = true;
487 break;
488 }
489
490 if (static_cast<size_t>(available_capacity_) + m->num_bytes >
491 options_.capacity_num_bytes) {
492 DLOG(ERROR) << "Consumer claims to have read too many bytes.";
493 break;
494 }
495
496 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
497 << m->num_bytes << " bytes were read. [control_port="
498 << control_port_.name() << "]";
499
500 available_capacity_ += m->num_bytes;
501 }
502 } while (message);
503 }
504
505 if (peer_closed_ != was_peer_closed ||
506 available_capacity_ != previous_capacity) {
507 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
508 }
407 } 509 }
408 510
409 } // namespace edk 511 } // namespace edk
410 } // namespace mojo 512 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698