Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(189)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
11 #include <limits>
11 #include <utility> 12 #include <utility>
12 13
13 #include "base/bind.h" 14 #include "base/bind.h"
14 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
15 #include "base/message_loop/message_loop.h" 17 #include "base/message_loop/message_loop.h"
16 #include "mojo/edk/embedder/embedder_internal.h" 18 #include "mojo/edk/embedder/embedder_internal.h"
17 #include "mojo/edk/embedder/platform_shared_buffer.h" 19 #include "mojo/edk/embedder/platform_shared_buffer.h"
18 #include "mojo/edk/embedder/platform_support.h" 20 #include "mojo/edk/embedder/platform_support.h"
19 #include "mojo/edk/system/data_pipe.h" 21 #include "mojo/edk/system/core.h"
22 #include "mojo/edk/system/data_pipe_control_message.h"
23 #include "mojo/edk/system/node_controller.h"
24 #include "mojo/edk/system/ports_message.h"
25 #include "mojo/public/c/system/data_pipe.h"
20 26
21 namespace mojo { 27 namespace mojo {
22 namespace edk { 28 namespace edk {
23 29
24 struct SharedMemoryHeader { 30 namespace {
25 uint32_t data_size; 31
26 uint32_t read_buffer_size; 32 struct MOJO_ALIGNAS(8) SerializedState {
33 MojoCreateDataPipeOptions options;
34 uint64_t pipe_id;
35 bool peer_closed;
36 uint32_t read_offset;
37 uint32_t bytes_available;
27 }; 38 };
28 39
29 void DataPipeConsumerDispatcher::Init( 40 } // namespace
30 ScopedPlatformHandle message_pipe,
31 char* serialized_read_buffer, size_t serialized_read_buffer_size) {
32 if (message_pipe.is_valid()) {
33 channel_ = RawChannel::Create(std::move(message_pipe));
34 channel_->SetSerializedData(
35 serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u,
36 nullptr, nullptr);
37 internal::g_io_thread_task_runner->PostTask(
38 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
39 } else {
40 // The data pipe consumer could have read all the data and the producer
41 // closed its end subsequently (before the consumer was sent). In that case
42 // when we deserialize the consumer we must make sure to set error_ or
43 // otherwise the peer-closed signal will never be satisfied.
44 error_ = true;
45 }
46 }
47 41
48 void DataPipeConsumerDispatcher::InitOnIO() { 42 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
49 base::AutoLock locker(lock()); 43 // reference to the dispatcher to ensure it lives as long as the observed port.
50 calling_init_ = true; 44 class DataPipeConsumerDispatcher::PortObserverThunk
51 if (channel_) 45 : public NodeController::PortObserver {
52 channel_->Init(this); 46 public:
53 calling_init_ = false; 47 explicit PortObserverThunk(
54 } 48 scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
49 : dispatcher_(dispatcher) {}
55 50
56 void DataPipeConsumerDispatcher::CloseOnIO() { 51 private:
57 base::AutoLock locker(lock()); 52 ~PortObserverThunk() override {}
58 if (channel_) { 53
59 channel_->Shutdown(); 54 // NodeController::PortObserver:
60 channel_ = nullptr; 55 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
56
57 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
58
59 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
60 };
61
62 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
63 NodeController* node_controller,
64 const ports::PortRef& control_port,
65 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
66 const MojoCreateDataPipeOptions& options,
67 bool initialized,
68 uint64_t pipe_id)
69 : options_(options),
70 node_controller_(node_controller),
71 control_port_(control_port),
72 pipe_id_(pipe_id),
73 shared_ring_buffer_(shared_ring_buffer) {
74 if (initialized) {
75 base::AutoLock lock(lock_);
76 InitializeNoLock();
61 } 77 }
62 } 78 }
63 79
64 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 80 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
65 return Type::DATA_PIPE_CONSUMER; 81 return Type::DATA_PIPE_CONSUMER;
66 } 82 }
67 83
68 scoped_refptr<DataPipeConsumerDispatcher> 84 MojoResult DataPipeConsumerDispatcher::Close() {
69 DataPipeConsumerDispatcher::Deserialize( 85 base::AutoLock lock(lock_);
70 const void* source, 86 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
71 size_t size, 87 return CloseNoLock();
72 PlatformHandleVector* platform_handles) {
73 MojoCreateDataPipeOptions options;
74 ScopedPlatformHandle shared_memory_handle;
75 size_t shared_memory_size = 0;
76
77 ScopedPlatformHandle platform_handle =
78 DataPipe::Deserialize(source, size, platform_handles, &options,
79 &shared_memory_handle, &shared_memory_size);
80
81 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
82
83 char* serialized_read_buffer = nullptr;
84 size_t serialized_read_buffer_size = 0;
85 scoped_refptr<PlatformSharedBuffer> shared_buffer;
86 scoped_ptr<PlatformSharedBufferMapping> mapping;
87 if (shared_memory_size) {
88 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
89 shared_memory_size, std::move(shared_memory_handle));
90 mapping = shared_buffer->Map(0, shared_memory_size);
91 char* buffer = static_cast<char*>(mapping->GetBase());
92 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
93 buffer += sizeof(SharedMemoryHeader);
94 if (header->data_size) {
95 rv->data_.assign(buffer, buffer + header->data_size);
96 buffer += header->data_size;
97 }
98
99 if (header->read_buffer_size) {
100 serialized_read_buffer = buffer;
101 serialized_read_buffer_size = header->read_buffer_size;
102 buffer += header->read_buffer_size;
103 }
104 }
105
106 rv->Init(std::move(platform_handle), serialized_read_buffer,
107 serialized_read_buffer_size);
108 return rv;
109 } 88 }
110 89
111 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( 90 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
112 const MojoCreateDataPipeOptions& options) 91 uint32_t* num_bytes,
113 : options_(options), 92 MojoReadDataFlags flags) {
114 channel_(nullptr), 93 base::AutoLock lock(lock_);
115 calling_init_(false), 94 if (!shared_ring_buffer_ || in_transit_)
116 in_two_phase_read_(false), 95 return MOJO_RESULT_INVALID_ARGUMENT;
117 two_phase_max_bytes_read_(0),
118 error_(false),
119 serialized_(false) {
120 }
121 96
122 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
123 // See comment in ~MessagePipeDispatcher.
124 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
125 channel_->Shutdown();
126 else
127 DCHECK(!channel_);
128 }
129
130 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
131 lock().AssertAcquired();
132 awakable_list_.CancelAll();
133 }
134
135 void DataPipeConsumerDispatcher::CloseImplNoLock() {
136 lock().AssertAcquired();
137 internal::g_io_thread_task_runner->PostTask(
138 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
139 }
140
141 scoped_refptr<Dispatcher>
142 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
143 lock().AssertAcquired();
144
145 SerializeInternal();
146
147 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
148 data_.swap(rv->data_);
149 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
150 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
151 rv->serialized_ = true;
152
153 return scoped_refptr<Dispatcher>(rv.get());
154 }
155
156 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
157 void* elements,
158 uint32_t* num_bytes,
159 MojoReadDataFlags flags) {
160 lock().AssertAcquired();
161 if (channel_)
162 channel_->EnsureLazyInitialized();
163 if (in_two_phase_read_) 97 if (in_two_phase_read_)
164 return MOJO_RESULT_BUSY; 98 return MOJO_RESULT_BUSY;
165 99
166 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 100 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
167 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 101 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
168 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 102 (flags & MOJO_READ_DATA_FLAG_DISCARD))
169 return MOJO_RESULT_INVALID_ARGUMENT; 103 return MOJO_RESULT_INVALID_ARGUMENT;
170 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 104 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
171 DVLOG_IF(2, elements) 105 DVLOG_IF(2, elements)
172 << "Query mode: ignoring non-null |elements|"; 106 << "Query mode: ignoring non-null |elements|";
173 *num_bytes = static_cast<uint32_t>(data_.size()); 107 *num_bytes = static_cast<uint32_t>(bytes_available_);
174 return MOJO_RESULT_OK; 108 return MOJO_RESULT_OK;
175 } 109 }
176 110
177 bool discard = false; 111 bool discard = false;
178 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 112 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
179 // These flags are mutally exclusive. 113 // These flags are mutally exclusive.
180 if (flags & MOJO_READ_DATA_FLAG_PEEK) 114 if (flags & MOJO_READ_DATA_FLAG_PEEK)
181 return MOJO_RESULT_INVALID_ARGUMENT; 115 return MOJO_RESULT_INVALID_ARGUMENT;
182 DVLOG_IF(2, elements) 116 DVLOG_IF(2, elements)
183 << "Discard mode: ignoring non-null |elements|"; 117 << "Discard mode: ignoring non-null |elements|";
184 discard = true; 118 discard = true;
185 } 119 }
186 120
187 uint32_t max_num_bytes_to_read = *num_bytes; 121 uint32_t max_num_bytes_to_read = *num_bytes;
188 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 122 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
189 return MOJO_RESULT_INVALID_ARGUMENT; 123 return MOJO_RESULT_INVALID_ARGUMENT;
190 124
191 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 125 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
192 uint32_t min_num_bytes_to_read = 126 uint32_t min_num_bytes_to_read =
193 all_or_none ? max_num_bytes_to_read : 0; 127 all_or_none ? max_num_bytes_to_read : 0;
194 128
195 if (min_num_bytes_to_read > data_.size()) 129 if (min_num_bytes_to_read > bytes_available_) {
196 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; 130 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
131 : MOJO_RESULT_OUT_OF_RANGE;
132 }
197 133
198 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, 134 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
199 static_cast<uint32_t>(data_.size())); 135 if (bytes_to_read == 0) {
200 if (bytes_to_read == 0) 136 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
201 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; 137 : MOJO_RESULT_SHOULD_WAIT;
138 }
202 139
203 if (!discard) 140 if (!discard) {
204 memcpy(elements, &data_[0], bytes_to_read); 141 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
142 CHECK(data);
143
144 uint8_t* destination = static_cast<uint8_t*>(elements);
145 CHECK(destination);
146
147 DCHECK_LE(read_offset_, options_.capacity_num_bytes);
148 uint32_t tail_bytes_to_copy =
149 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
150 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
151 if (tail_bytes_to_copy > 0)
152 memcpy(destination, data + read_offset_, tail_bytes_to_copy);
153 if (head_bytes_to_copy > 0)
154 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
155 }
205 *num_bytes = bytes_to_read; 156 *num_bytes = bytes_to_read;
206 157
207 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 158 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
208 if (discard || !peek) 159 if (discard || !peek) {
209 data_.erase(data_.begin(), data_.begin() + bytes_to_read); 160 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
161 bytes_available_ -= bytes_to_read;
162
163 base::AutoUnlock unlock(lock_);
164 NotifyRead(bytes_to_read);
165 }
210 166
211 return MOJO_RESULT_OK; 167 return MOJO_RESULT_OK;
212 } 168 }
213 169
214 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( 170 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
215 const void** buffer, 171 uint32_t* buffer_num_bytes,
216 uint32_t* buffer_num_bytes, 172 MojoReadDataFlags flags) {
217 MojoReadDataFlags flags) { 173 base::AutoLock lock(lock_);
218 lock().AssertAcquired(); 174 if (!shared_ring_buffer_ || in_transit_)
219 if (channel_) 175 return MOJO_RESULT_INVALID_ARGUMENT;
220 channel_->EnsureLazyInitialized(); 176
221 if (in_two_phase_read_) 177 if (in_two_phase_read_)
222 return MOJO_RESULT_BUSY; 178 return MOJO_RESULT_BUSY;
223 179
224 // These flags may not be used in two-phase mode. 180 // These flags may not be used in two-phase mode.
225 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 181 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
226 (flags & MOJO_READ_DATA_FLAG_QUERY) || 182 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
227 (flags & MOJO_READ_DATA_FLAG_PEEK)) 183 (flags & MOJO_READ_DATA_FLAG_PEEK))
228 return MOJO_RESULT_INVALID_ARGUMENT; 184 return MOJO_RESULT_INVALID_ARGUMENT;
229 185
230 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); 186 if (bytes_available_ == 0) {
231 if (max_num_bytes_to_read == 0) 187 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
232 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; 188 : MOJO_RESULT_SHOULD_WAIT;
189 }
190
191 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
192 uint32_t bytes_to_read = std::min(bytes_available_,
193 options_.capacity_num_bytes - read_offset_);
194
195 CHECK(ring_buffer_mapping_);
196 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
197 CHECK(data);
233 198
234 in_two_phase_read_ = true; 199 in_two_phase_read_ = true;
235 *buffer = &data_[0]; 200 *buffer = data + read_offset_;
236 *buffer_num_bytes = max_num_bytes_to_read; 201 *buffer_num_bytes = bytes_to_read;
237 two_phase_max_bytes_read_ = max_num_bytes_to_read; 202 two_phase_max_bytes_read_ = bytes_to_read;
238 203
239 return MOJO_RESULT_OK; 204 return MOJO_RESULT_OK;
240 } 205 }
241 206
242 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( 207 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
243 uint32_t num_bytes_read) { 208 base::AutoLock lock(lock_);
244 lock().AssertAcquired();
245 if (!in_two_phase_read_) 209 if (!in_two_phase_read_)
246 return MOJO_RESULT_FAILED_PRECONDITION; 210 return MOJO_RESULT_FAILED_PRECONDITION;
247 211
248 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 212 if (in_transit_)
213 return MOJO_RESULT_INVALID_ARGUMENT;
214
215 CHECK(shared_ring_buffer_);
216
217 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
249 MojoResult rv; 218 MojoResult rv;
250 if (num_bytes_read > two_phase_max_bytes_read_ || 219 if (num_bytes_read > two_phase_max_bytes_read_ ||
251 num_bytes_read % options_.element_num_bytes != 0) { 220 num_bytes_read % options_.element_num_bytes != 0) {
252 rv = MOJO_RESULT_INVALID_ARGUMENT; 221 rv = MOJO_RESULT_INVALID_ARGUMENT;
253 } else { 222 } else {
254 rv = MOJO_RESULT_OK; 223 rv = MOJO_RESULT_OK;
255 data_.erase(data_.begin(), data_.begin() + num_bytes_read); 224 read_offset_ =
225 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
226
227 DCHECK_GE(bytes_available_, num_bytes_read);
228 bytes_available_ -= num_bytes_read;
229
230 base::AutoUnlock unlock(lock_);
231 NotifyRead(num_bytes_read);
256 } 232 }
257 233
258 in_two_phase_read_ = false; 234 in_two_phase_read_ = false;
259 two_phase_max_bytes_read_ = 0; 235 two_phase_max_bytes_read_ = 0;
260 if (!data_received_during_two_phase_read_.empty()) {
261 if (data_.empty()) {
262 data_received_during_two_phase_read_.swap(data_);
263 } else {
264 data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
265 data_received_during_two_phase_read_.end());
266 data_received_during_two_phase_read_.clear();
267 }
268 }
269 236
270 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 237 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
271 if (!new_state.equals(old_state)) 238 if (!new_state.equals(old_state))
272 awakable_list_.AwakeForStateChange(new_state); 239 awakable_list_.AwakeForStateChange(new_state);
273 240
274 return rv; 241 return rv;
275 } 242 }
276 243
277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() 244 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
278 const { 245 base::AutoLock lock(lock_);
279 lock().AssertAcquired(); 246 return GetHandleSignalsStateNoLock();
280
281 HandleSignalsState rv;
282 if (!data_.empty()) {
283 if (!in_two_phase_read_)
284 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
286 } else if (!error_) {
287 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
288 }
289
290 if (error_)
291 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
292 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
293 return rv;
294 } 247 }
295 248
296 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( 249 MojoResult DataPipeConsumerDispatcher::AddAwakable(
297 Awakable* awakable, 250 Awakable* awakable,
298 MojoHandleSignals signals, 251 MojoHandleSignals signals,
299 uintptr_t context, 252 uintptr_t context,
300 HandleSignalsState* signals_state) { 253 HandleSignalsState* signals_state) {
301 lock().AssertAcquired(); 254 base::AutoLock lock(lock_);
302 if (channel_) 255 if (!shared_ring_buffer_ || in_transit_) {
303 channel_->EnsureLazyInitialized(); 256 if (signals_state)
304 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 257 *signals_state = HandleSignalsState();
258 return MOJO_RESULT_INVALID_ARGUMENT;
259 }
260 UpdateSignalsStateNoLock();
261 HandleSignalsState state = GetHandleSignalsStateNoLock();
305 if (state.satisfies(signals)) { 262 if (state.satisfies(signals)) {
306 if (signals_state) 263 if (signals_state)
307 *signals_state = state; 264 *signals_state = state;
308 return MOJO_RESULT_ALREADY_EXISTS; 265 return MOJO_RESULT_ALREADY_EXISTS;
309 } 266 }
310 if (!state.can_satisfy(signals)) { 267 if (!state.can_satisfy(signals)) {
311 if (signals_state) 268 if (signals_state)
312 *signals_state = state; 269 *signals_state = state;
313 return MOJO_RESULT_FAILED_PRECONDITION; 270 return MOJO_RESULT_FAILED_PRECONDITION;
314 } 271 }
315 272
316 awakable_list_.Add(awakable, signals, context); 273 awakable_list_.Add(awakable, signals, context);
317 return MOJO_RESULT_OK; 274 return MOJO_RESULT_OK;
318 } 275 }
319 276
320 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( 277 void DataPipeConsumerDispatcher::RemoveAwakable(
321 Awakable* awakable, 278 Awakable* awakable,
322 HandleSignalsState* signals_state) { 279 HandleSignalsState* signals_state) {
323 lock().AssertAcquired(); 280 base::AutoLock lock(lock_);
281 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
282 *signals_state = HandleSignalsState();
283 else if (signals_state)
284 *signals_state = GetHandleSignalsStateNoLock();
324 awakable_list_.Remove(awakable); 285 awakable_list_.Remove(awakable);
325 if (signals_state) 286 }
326 *signals_state = GetHandleSignalsStateImplNoLock(); 287
327 } 288 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
328 289 uint32_t* num_ports,
329 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( 290 uint32_t* num_handles) {
330 size_t* max_size, 291 base::AutoLock lock(lock_);
331 size_t* max_platform_handles) { 292 DCHECK(in_transit_);
332 if (!serialized_) { 293 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
333 // Handles the case where we have messages read off RawChannel but not ready 294 *num_ports = 1;
334 // by MojoReadMessage. 295 *num_handles = 1;
335 SerializeInternal(); 296 }
336 } 297
337 298 bool DataPipeConsumerDispatcher::EndSerialize(
338 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
339 !data_.empty() || !serialized_read_buffer_.empty(),
340 max_size, max_platform_handles);
341 }
342
343 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
344 void* destination, 299 void* destination,
345 size_t* actual_size, 300 ports::PortName* ports,
346 PlatformHandleVector* platform_handles) { 301 PlatformHandle* platform_handles) {
347 ScopedPlatformHandle shared_memory_handle; 302 SerializedState* state = static_cast<SerializedState*>(destination);
348 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); 303 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
349 if (shared_memory_size) { 304
350 shared_memory_size += sizeof(SharedMemoryHeader); 305 base::AutoLock lock(lock_);
351 SharedMemoryHeader header; 306 DCHECK(in_transit_);
352 header.data_size = static_cast<uint32_t>(data_.size()); 307 state->pipe_id = pipe_id_;
353 header.read_buffer_size = 308 state->peer_closed = peer_closed_;
354 static_cast<uint32_t>(serialized_read_buffer_.size()); 309 state->read_offset = read_offset_;
355 310 state->bytes_available = bytes_available_;
356 scoped_refptr<PlatformSharedBuffer> shared_buffer( 311
357 internal::g_platform_support->CreateSharedBuffer( 312 ports[0] = control_port_.name();
358 shared_memory_size)); 313
359 scoped_ptr<PlatformSharedBufferMapping> mapping( 314 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
360 shared_buffer->Map(0, shared_memory_size)); 315 platform_handles[0] = buffer_handle_for_transit_.get();
361 316
362 char* start = static_cast<char*>(mapping->GetBase()); 317 return true;
363 memcpy(start, &header, sizeof(SharedMemoryHeader)); 318 }
364 start += sizeof(SharedMemoryHeader); 319
365 320 bool DataPipeConsumerDispatcher::BeginTransit() {
366 if (!data_.empty()) { 321 base::AutoLock lock(lock_);
367 memcpy(start, &data_[0], data_.size()); 322 if (in_transit_)
368 start += data_.size(); 323 return false;
324 in_transit_ = !in_two_phase_read_;
325 return in_transit_;
326 }
327
328 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
329 node_controller_->SetPortObserver(control_port_, nullptr);
330
331 base::AutoLock lock(lock_);
332 DCHECK(in_transit_);
333 in_transit_ = false;
334 transferred_ = true;
335 ignore_result(buffer_handle_for_transit_.release());
336 CloseNoLock();
337 }
338
339 void DataPipeConsumerDispatcher::CancelTransit() {
340 base::AutoLock lock(lock_);
341 DCHECK(in_transit_);
342 in_transit_ = false;
343 buffer_handle_for_transit_.reset();
344 UpdateSignalsStateNoLock();
345 }
346
347 // static
348 scoped_refptr<DataPipeConsumerDispatcher>
349 DataPipeConsumerDispatcher::Deserialize(const void* data,
350 size_t num_bytes,
351 const ports::PortName* ports,
352 size_t num_ports,
353 PlatformHandle* handles,
354 size_t num_handles) {
355 if (num_ports != 1 || num_handles != 1 ||
356 num_bytes != sizeof(SerializedState)) {
357 return nullptr;
358 }
359
360 const SerializedState* state = static_cast<const SerializedState*>(data);
361
362 NodeController* node_controller = internal::g_core->GetNodeController();
363 ports::PortRef port;
364 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
365 return nullptr;
366
367 PlatformHandle buffer_handle;
368 std::swap(buffer_handle, handles[0]);
369 scoped_refptr<PlatformSharedBuffer> ring_buffer =
370 internal::g_platform_support->CreateSharedBufferFromHandle(
371 state->options.capacity_num_bytes,
372 ScopedPlatformHandle(buffer_handle));
373 if (!ring_buffer) {
374 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
375 return nullptr;
376 }
377
378 scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
379 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
380 state->options, false /* initialized */,
381 state->pipe_id);
382
383 {
384 base::AutoLock lock(dispatcher->lock_);
385 dispatcher->peer_closed_ = state->peer_closed;
386 dispatcher->read_offset_ = state->read_offset;
387 dispatcher->bytes_available_ = state->bytes_available;
388 dispatcher->InitializeNoLock();
389 }
390
391 return dispatcher;
392 }
393
394 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
395 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
396 !in_transit_);
397 }
398
399 void DataPipeConsumerDispatcher::InitializeNoLock() {
400 lock_.AssertAcquired();
401
402 if (shared_ring_buffer_) {
403 DCHECK(!ring_buffer_mapping_);
404 ring_buffer_mapping_ =
405 shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
406 if (!ring_buffer_mapping_) {
407 DLOG(ERROR) << "Failed to map shared buffer.";
408 shared_ring_buffer_ = nullptr;
369 } 409 }
370 410 }
371 if (!serialized_read_buffer_.empty()) { 411
372 memcpy(start, &serialized_read_buffer_[0], 412 base::AutoUnlock unlock(lock_);
373 serialized_read_buffer_.size()); 413 node_controller_->SetPortObserver(
374 start += serialized_read_buffer_.size(); 414 control_port_,
375 } 415 make_scoped_refptr(new PortObserverThunk(this)));
376 416 }
377 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); 417
378 } 418 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
379 419 lock_.AssertAcquired();
380 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), 420 if (is_closed_ || in_transit_)
381 std::move(shared_memory_handle), shared_memory_size, 421 return MOJO_RESULT_INVALID_ARGUMENT;
382 destination, actual_size, platform_handles); 422 is_closed_ = true;
383 CloseImplNoLock(); 423 ring_buffer_mapping_.reset();
384 return true; 424 shared_ring_buffer_ = nullptr;
385 } 425
386 426 awakable_list_.CancelAll();
387 void DataPipeConsumerDispatcher::TransportStarted() { 427 if (!transferred_) {
388 started_transport_.Acquire(); 428 base::AutoUnlock unlock(lock_);
389 } 429 node_controller_->ClosePort(control_port_);
390 430 }
391 void DataPipeConsumerDispatcher::TransportEnded() { 431
392 started_transport_.Release(); 432 return MOJO_RESULT_OK;
393 433 }
394 base::AutoLock locker(lock()); 434
395 435 HandleSignalsState
396 // If transporting of DP failed, we might have got more data and didn't awake 436 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
397 // for. 437 lock_.AssertAcquired();
398 // TODO(jam): should we care about only alerting if it was empty before 438
399 // TransportStarted? 439 HandleSignalsState rv;
400 if (!data_.empty()) 440 if (shared_ring_buffer_ && bytes_available_) {
401 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 441 if (!in_two_phase_read_)
402 } 442 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
403 443 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
404 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { 444 } else if (!peer_closed_ && shared_ring_buffer_) {
405 lock().AssertAcquired(); 445 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
406 return in_two_phase_read_; 446 }
407 } 447
408 448 if (peer_closed_)
409 void DataPipeConsumerDispatcher::OnReadMessage( 449 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
410 const MessageInTransit::View& message_view, 450 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
411 ScopedPlatformHandleVectorPtr platform_handles) { 451 return rv;
412 const char* bytes_start = static_cast<const char*>(message_view.bytes()); 452 }
413 const char* bytes_end = bytes_start + message_view.num_bytes(); 453
414 if (started_transport_.Try()) { 454 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
415 // We're not in the middle of being sent. 455 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
416 456 << num_bytes << " bytes read. [control_port="
417 // Can get synchronously called back in Init if there was initial data. 457 << control_port_.name() << "]";
418 scoped_ptr<base::AutoLock> locker; 458
419 if (!calling_init_) { 459 SendDataPipeControlMessage(node_controller_, control_port_,
420 locker.reset(new base::AutoLock(lock())); 460 DataPipeCommand::DATA_WAS_READ, num_bytes);
421 } 461 }
422 462
423 if (in_two_phase_read_) { 463 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
424 data_received_during_two_phase_read_.insert( 464 base::AutoLock lock(lock_);
425 data_received_during_two_phase_read_.end(), bytes_start, bytes_end); 465
426 } else { 466 // We stop observing the control port as soon it's transferred, but this can
427 bool was_empty = data_.empty(); 467 // race with events which are raised right before that happens. This is fine
428 data_.insert(data_.end(), bytes_start, bytes_end); 468 // to ignore.
429 if (was_empty) 469 if (transferred_)
430 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 470 return;
431 } 471
432 started_transport_.Release(); 472 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
433 } else { 473
434 // See comment in MessagePipeDispatcher about why we can't and don't need 474 UpdateSignalsStateNoLock();
435 // to lock here. 475 }
436 data_.insert(data_.end(), bytes_start, bytes_end); 476
437 } 477 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
438 } 478 lock_.AssertAcquired();
439 479
440 void DataPipeConsumerDispatcher::OnError(Error error) { 480 bool was_peer_closed = peer_closed_;
441 switch (error) { 481 size_t previous_bytes_available = bytes_available_;
442 case ERROR_READ_SHUTDOWN: 482
443 // The other side was cleanly closed, so this isn't actually an error. 483 ports::PortStatus port_status;
444 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; 484 if (node_controller_->node()->GetStatus(control_port_, &port_status) !=
445 break; 485 ports::OK ||
446 case ERROR_READ_BROKEN: 486 !port_status.receiving_messages) {
447 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; 487 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
448 break; 488 << " [control_port=" << control_port_.name() << "]";
449 case ERROR_READ_BAD_MESSAGE: 489
450 // Receiving a bad message means either a bug, data corruption, or 490 peer_closed_ = true;
451 // malicious attack (probably due to some other bug). 491 }
452 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " 492
453 << "message)"; 493 if (port_status.has_messages && !in_transit_) {
454 break; 494 ports::ScopedMessage message;
455 case ERROR_READ_UNKNOWN: 495 do {
456 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; 496 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
457 break; 497 &message);
458 case ERROR_WRITE: 498 if (rv != ports::OK)
459 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; 499 peer_closed_ = true;
460 break; 500 if (message) {
461 } 501 const DataPipeControlMessage* m =
462 502 static_cast<const DataPipeControlMessage*>(
463 error_ = true; 503 message->payload_bytes());
464 if (started_transport_.Try()) { 504
465 base::AutoLock locker(lock()); 505 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
466 // We can get two OnError callbacks before the post task below completes. 506 DLOG(ERROR) << "Unexpected control message from producer.";
467 // Although RawChannel still has a pointer to this object until Shutdown is 507 peer_closed_ = true;
468 // called, that is safe since this class always does a PostTask to the IO 508 break;
469 // thread to self destruct. 509 }
470 if (channel_) { 510
471 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 511 if (static_cast<size_t>(bytes_available_) + m->num_bytes >
472 channel_->Shutdown(); 512 options_.capacity_num_bytes) {
473 channel_ = nullptr; 513 DLOG(ERROR) << "Producer claims to have written too many bytes.";
474 } 514 peer_closed_ = true;
475 started_transport_.Release(); 515 break;
476 } else { 516 }
477 // We must be waiting to call ReleaseHandle. It will call Shutdown. 517
478 } 518 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
479 } 519 << m->num_bytes << " bytes were written. [control_port="
480 520 << control_port_.name() << "]";
481 void DataPipeConsumerDispatcher::SerializeInternal() { 521
482 DCHECK(!in_two_phase_read_); 522 bytes_available_ += m->num_bytes;
483 // We need to stop watching handle immediately, even though not on IO thread, 523 }
484 // so that other messages aren't read after this. 524 } while (message);
485 if (channel_) { 525 }
486 std::vector<char> serialized_write_buffer; 526
487 std::vector<int> fds; 527 if (peer_closed_ != was_peer_closed ||
488 bool write_error = false; 528 bytes_available_ != previous_bytes_available) {
489 serialized_platform_handle_ = channel_->ReleaseHandle( 529 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
490 &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds, 530 }
491 &write_error);
492 CHECK(serialized_write_buffer.empty());
493 CHECK(fds.empty());
494 CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write.";
495
496 channel_ = nullptr;
497 }
498
499 serialized_ = true;
500 } 531 }
501 532
502 } // namespace edk 533 } // namespace edk
503 } // namespace mojo 534 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698