Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(885)

Side by Side Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm> 10 #include <algorithm>
11 #include <utility> 11 #include <utility>
12 12
13 #include "base/bind.h" 13 #include "base/bind.h"
14 #include "base/logging.h" 14 #include "base/logging.h"
15 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
16 #include "mojo/edk/embedder/embedder_internal.h" 16 #include "mojo/edk/embedder/embedder_internal.h"
17 #include "mojo/edk/embedder/platform_shared_buffer.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h"
18 #include "mojo/edk/embedder/platform_support.h" 18 #include "mojo/edk/embedder/platform_support.h"
19 #include "mojo/edk/system/data_pipe.h"
20 19
21 namespace mojo { 20 namespace mojo {
22 namespace edk { 21 namespace edk {
23 22
24 struct SharedMemoryHeader { 23 void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe,
25 uint32_t data_size; 24 char* serialized_write_buffer,
26 uint32_t read_buffer_size; 25 uint32_t serialized_write_buffer_size,
27 }; 26 char* serialized_read_buffer,
27 uint32_t serialized_read_buffer_size,
28 ScopedPlatformHandle shared_buffer_handle,
29 uint32_t ring_buffer_start,
30 uint32_t ring_buffer_size) {
31 if (!message_pipe.is_valid()) {
32 peer_closed_ = true;
33 }
28 34
29 void DataPipeConsumerDispatcher::Init( 35 data_pipe_->Init(std::move(message_pipe), serialized_write_buffer,
30 ScopedPlatformHandle message_pipe, 36 serialized_write_buffer_size, serialized_read_buffer,
31 char* serialized_read_buffer, size_t serialized_read_buffer_size) { 37 serialized_read_buffer_size, std::move(shared_buffer_handle),
32 if (message_pipe.is_valid()) { 38 ring_buffer_start, ring_buffer_size, false /* is_producer */,
33 channel_ = RawChannel::Create(std::move(message_pipe)); 39 base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
34 channel_->SetSerializedData(
35 serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u,
36 nullptr, nullptr);
37 internal::g_io_thread_task_runner->PostTask(
38 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
39 } else {
40 // The data pipe consumer could have read all the data and the producer
41 // closed its end subsequently (before the consumer was sent). In that case
42 // when we deserialize the consumer we must make sure to set error_ or
43 // otherwise the peer-closed signal will never be satisfied.
44 error_ = true;
45 }
46 } 40 }
47 41
48 void DataPipeConsumerDispatcher::InitOnIO() { 42 void DataPipeConsumerDispatcher::InitOnIO() {
49 base::AutoLock locker(lock()); 43 base::AutoLock locker(lock());
50 calling_init_ = true; 44 calling_init_ = true;
51 if (channel_) 45 RawChannel* channel = data_pipe_->GetChannel();
52 channel_->Init(this); 46 if (channel)
47 channel->Init(this);
53 calling_init_ = false; 48 calling_init_ = false;
54 } 49 }
55 50
56 void DataPipeConsumerDispatcher::CloseOnIO() { 51 void DataPipeConsumerDispatcher::CloseOnIO() {
57 base::AutoLock locker(lock()); 52 base::AutoLock locker(lock());
58 if (channel_) { 53 data_pipe_->Shutdown();
59 channel_->Shutdown();
60 channel_ = nullptr;
61 }
62 } 54 }
63 55
64 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { 56 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
65 return Type::DATA_PIPE_CONSUMER; 57 return Type::DATA_PIPE_CONSUMER;
66 } 58 }
67 59
68 scoped_refptr<DataPipeConsumerDispatcher> 60 scoped_refptr<DataPipeConsumerDispatcher>
69 DataPipeConsumerDispatcher::Deserialize( 61 DataPipeConsumerDispatcher::Deserialize(
70 const void* source, 62 const void* source,
71 size_t size, 63 size_t size,
72 PlatformHandleVector* platform_handles) { 64 PlatformHandleVector* platform_handles) {
73 MojoCreateDataPipeOptions options; 65 MojoCreateDataPipeOptions options;
74 ScopedPlatformHandle shared_memory_handle; 66 ScopedPlatformHandle channel_handle, channel_shared_handle,
Anand Mistry (off Chromium) 2016/01/11 06:19:34 It feels like most of this code should be encapsul
Eliot Courtney 2016/01/13 00:00:10 Done.
75 size_t shared_memory_size = 0; 67 shared_buffer_handle;
68 uint32_t serialized_read_buffer_size, serialized_write_buffer_size;
69 uint32_t ring_buffer_start, ring_buffer_size;
76 70
77 ScopedPlatformHandle platform_handle = 71 ScopedPlatformHandle platform_handle = DataPipe::Deserialize(
78 DataPipe::Deserialize(source, size, platform_handles, &options, 72 source, size, platform_handles, &options, &channel_shared_handle,
79 &shared_memory_handle, &shared_memory_size); 73 &serialized_read_buffer_size, &serialized_write_buffer_size,
74 &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size);
80 75
81 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); 76 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
82 77
78 uint32_t buffer_size =
79 serialized_write_buffer_size + serialized_read_buffer_size;
83 char* serialized_read_buffer = nullptr; 80 char* serialized_read_buffer = nullptr;
84 size_t serialized_read_buffer_size = 0; 81 char* serialized_write_buffer = nullptr;
85 scoped_refptr<PlatformSharedBuffer> shared_buffer; 82 scoped_refptr<PlatformSharedBuffer> channel_shared_buffer;
86 scoped_ptr<PlatformSharedBufferMapping> mapping; 83 scoped_ptr<PlatformSharedBufferMapping> mapping;
87 if (shared_memory_size) { 84 if (channel_shared_handle.is_valid()) {
88 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( 85 channel_shared_buffer =
89 shared_memory_size, std::move(shared_memory_handle)); 86 internal::g_platform_support->CreateSharedBufferFromHandle(
90 mapping = shared_buffer->Map(0, shared_memory_size); 87 buffer_size, std::move(channel_shared_handle));
91 char* buffer = static_cast<char*>(mapping->GetBase()); 88 mapping = channel_shared_buffer->Map(0, buffer_size);
92 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
93 buffer += sizeof(SharedMemoryHeader);
94 if (header->data_size) {
95 rv->data_.assign(buffer, buffer + header->data_size);
96 buffer += header->data_size;
97 }
98 89
99 if (header->read_buffer_size) { 90 serialized_read_buffer = static_cast<char*>(mapping->GetBase());
100 serialized_read_buffer = buffer; 91 serialized_write_buffer =
101 serialized_read_buffer_size = header->read_buffer_size; 92 static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size;
102 buffer += header->read_buffer_size;
103 }
104 } 93 }
105 94
106 rv->Init(std::move(platform_handle), serialized_read_buffer, 95 rv->Init(std::move(platform_handle), serialized_read_buffer,
107 serialized_read_buffer_size); 96 serialized_read_buffer_size, serialized_write_buffer,
97 serialized_write_buffer_size, std::move(shared_buffer_handle),
98 ring_buffer_start, ring_buffer_size);
108 return rv; 99 return rv;
109 } 100 }
110 101
111 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( 102 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
112 const MojoCreateDataPipeOptions& options) 103 const MojoCreateDataPipeOptions& options)
113 : options_(options), 104 : data_pipe_(new DataPipe(options)),
114 channel_(nullptr),
115 calling_init_(false), 105 calling_init_(false),
106 peer_closed_(false),
116 in_two_phase_read_(false), 107 in_two_phase_read_(false),
117 two_phase_max_bytes_read_(0), 108 two_phase_max_bytes_read_(0u) {}
118 error_(false),
119 serialized_(false) {
120 }
121 109
122 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { 110 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
123 // See comment in ~MessagePipeDispatcher. 111 // See comment in ~MessagePipeDispatcher.
124 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 112 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
125 channel_->Shutdown(); 113 data_pipe_->Shutdown();
126 else 114 else
127 DCHECK(!channel_); 115 DCHECK(!data_pipe_->GetChannel());
128 } 116 }
129 117
130 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { 118 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
131 lock().AssertAcquired(); 119 lock().AssertAcquired();
132 awakable_list_.CancelAll(); 120 awakable_list_.CancelAll();
133 } 121 }
134 122
135 void DataPipeConsumerDispatcher::CloseImplNoLock() { 123 void DataPipeConsumerDispatcher::CloseImplNoLock() {
136 lock().AssertAcquired(); 124 lock().AssertAcquired();
137 internal::g_io_thread_task_runner->PostTask( 125 internal::g_io_thread_task_runner->PostTask(
138 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); 126 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this));
139 } 127 }
140 128
141 scoped_refptr<Dispatcher> 129 scoped_refptr<Dispatcher>
142 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 130 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
143 lock().AssertAcquired(); 131 lock().AssertAcquired();
144 132
145 SerializeInternal(); 133 scoped_refptr<DataPipeConsumerDispatcher> rv =
134 Create(data_pipe_->GetOptions());
135 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
146 136
147 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); 137 DCHECK(!in_two_phase_read_);
148 data_.swap(rv->data_);
149 serialized_read_buffer_.swap(rv->serialized_read_buffer_);
150 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
151 rv->serialized_ = true;
152 138
153 return scoped_refptr<Dispatcher>(rv.get()); 139 return scoped_refptr<Dispatcher>(rv.get());
154 } 140 }
155 141
156 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( 142 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
157 void* elements, 143 void* elements,
158 uint32_t* num_bytes, 144 uint32_t* num_bytes,
159 MojoReadDataFlags flags) { 145 MojoReadDataFlags flags) {
160 lock().AssertAcquired(); 146 lock().AssertAcquired();
161 if (channel_)
162 channel_->EnsureLazyInitialized();
163 if (in_two_phase_read_) 147 if (in_two_phase_read_)
164 return MOJO_RESULT_BUSY; 148 return MOJO_RESULT_BUSY;
165 149
166 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { 150 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
167 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || 151 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
168 (flags & MOJO_READ_DATA_FLAG_DISCARD)) 152 (flags & MOJO_READ_DATA_FLAG_DISCARD))
169 return MOJO_RESULT_INVALID_ARGUMENT; 153 return MOJO_RESULT_INVALID_ARGUMENT;
170 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. 154 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
171 DVLOG_IF(2, elements) 155 DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|";
172 << "Query mode: ignoring non-null |elements|"; 156 *num_bytes = data_pipe_->GetReadableBytes();
173 *num_bytes = static_cast<uint32_t>(data_.size());
174 return MOJO_RESULT_OK; 157 return MOJO_RESULT_OK;
175 } 158 }
176 159
177 bool discard = false; 160 bool discard = false;
178 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { 161 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
179 // These flags are mutally exclusive. 162 // These flags are mutually exclusive.
180 if (flags & MOJO_READ_DATA_FLAG_PEEK) 163 if (flags & MOJO_READ_DATA_FLAG_PEEK)
181 return MOJO_RESULT_INVALID_ARGUMENT; 164 return MOJO_RESULT_INVALID_ARGUMENT;
182 DVLOG_IF(2, elements) 165 DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|";
183 << "Discard mode: ignoring non-null |elements|";
184 discard = true; 166 discard = true;
185 } 167 }
186 168
187 uint32_t max_num_bytes_to_read = *num_bytes; 169 uint32_t max_num_bytes_to_read = *num_bytes;
188 if (max_num_bytes_to_read % options_.element_num_bytes != 0) 170 if (max_num_bytes_to_read % data_pipe_->GetOptions().element_num_bytes != 0)
189 return MOJO_RESULT_INVALID_ARGUMENT; 171 return MOJO_RESULT_INVALID_ARGUMENT;
190 172
191 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; 173 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
192 uint32_t min_num_bytes_to_read = 174 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
193 all_or_none ? max_num_bytes_to_read : 0;
194 175
195 if (min_num_bytes_to_read > data_.size()) 176 uint32_t readable_bytes = data_pipe_->GetReadableBytes();
196 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; 177 if (min_num_bytes_to_read > readable_bytes)
178 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
179 : MOJO_RESULT_OUT_OF_RANGE;
197 180
198 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, 181 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, readable_bytes);
199 static_cast<uint32_t>(data_.size()));
200 if (bytes_to_read == 0) 182 if (bytes_to_read == 0)
201 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; 183 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
184 : MOJO_RESULT_SHOULD_WAIT;
202 185
203 if (!discard) 186 // |ReadDataFromSharedBuffer| failing means we haven't got the shared buffer
204 memcpy(elements, &data_[0], bytes_to_read); 187 // yet, so we should wait.
188 if (!discard &&
189 !data_pipe_->ReadDataFromSharedBuffer(elements, bytes_to_read)) {
190 return MOJO_RESULT_SHOULD_WAIT;
191 }
192
205 *num_bytes = bytes_to_read; 193 *num_bytes = bytes_to_read;
206 194
207 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); 195 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
208 if (discard || !peek) 196 bool should_update = !(flags & MOJO_READ_DATA_FLAG_PEEK) || discard;
209 data_.erase(data_.begin(), data_.begin() + bytes_to_read); 197 if (should_update)
198 data_pipe_->UpdateFromRead(bytes_to_read);
199 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
200 if (!new_state.equals(old_state))
201 awakable_list_.AwakeForStateChange(new_state);
202
203 // Deal with state changes due to peer being closed in OnError.
204 if (should_update && !data_pipe_->NotifyRead(bytes_to_read))
205 peer_closed_ = true;
210 206
211 return MOJO_RESULT_OK; 207 return MOJO_RESULT_OK;
212 } 208 }
213 209
214 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( 210 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
215 const void** buffer, 211 const void** buffer,
216 uint32_t* buffer_num_bytes, 212 uint32_t* buffer_num_bytes,
217 MojoReadDataFlags flags) { 213 MojoReadDataFlags flags) {
218 lock().AssertAcquired(); 214 lock().AssertAcquired();
219 if (channel_)
220 channel_->EnsureLazyInitialized();
221 if (in_two_phase_read_) 215 if (in_two_phase_read_)
222 return MOJO_RESULT_BUSY; 216 return MOJO_RESULT_BUSY;
223 217
224 // These flags may not be used in two-phase mode. 218 // These flags may not be used in two-phase mode.
225 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || 219 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
226 (flags & MOJO_READ_DATA_FLAG_QUERY) || 220 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
227 (flags & MOJO_READ_DATA_FLAG_PEEK)) 221 (flags & MOJO_READ_DATA_FLAG_PEEK))
228 return MOJO_RESULT_INVALID_ARGUMENT; 222 return MOJO_RESULT_INVALID_ARGUMENT;
229 223
230 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); 224 uint32_t readable_bytes;
225 const void* temp_buf = data_pipe_->GetReadBuffer(&readable_bytes);
226
227 uint32_t max_num_bytes_to_read = readable_bytes;
231 if (max_num_bytes_to_read == 0) 228 if (max_num_bytes_to_read == 0)
232 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; 229 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
230 : MOJO_RESULT_SHOULD_WAIT;
233 231
234 in_two_phase_read_ = true; 232 in_two_phase_read_ = true;
235 *buffer = &data_[0]; 233 *buffer = temp_buf;
236 *buffer_num_bytes = max_num_bytes_to_read; 234 *buffer_num_bytes = max_num_bytes_to_read;
237 two_phase_max_bytes_read_ = max_num_bytes_to_read; 235 two_phase_max_bytes_read_ = max_num_bytes_to_read;
238 236
239 return MOJO_RESULT_OK; 237 return MOJO_RESULT_OK;
240 } 238 }
241 239
242 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( 240 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
243 uint32_t num_bytes_read) { 241 uint32_t num_bytes_read) {
244 lock().AssertAcquired(); 242 lock().AssertAcquired();
245 if (!in_two_phase_read_) 243 if (!in_two_phase_read_)
246 return MOJO_RESULT_FAILED_PRECONDITION; 244 return MOJO_RESULT_FAILED_PRECONDITION;
247 245
248 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 246 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
249 MojoResult rv; 247 in_two_phase_read_ = false;
248
250 if (num_bytes_read > two_phase_max_bytes_read_ || 249 if (num_bytes_read > two_phase_max_bytes_read_ ||
251 num_bytes_read % options_.element_num_bytes != 0) { 250 num_bytes_read % data_pipe_->GetOptions().element_num_bytes != 0) {
252 rv = MOJO_RESULT_INVALID_ARGUMENT; 251 return MOJO_RESULT_INVALID_ARGUMENT;
253 } else {
254 rv = MOJO_RESULT_OK;
255 data_.erase(data_.begin(), data_.begin() + num_bytes_read);
256 } 252 }
257 253
258 in_two_phase_read_ = false; 254 data_pipe_->UpdateFromRead(num_bytes_read);
259 two_phase_max_bytes_read_ = 0;
260 if (!data_received_during_two_phase_read_.empty()) {
261 if (data_.empty()) {
262 data_received_during_two_phase_read_.swap(data_);
263 } else {
264 data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
265 data_received_during_two_phase_read_.end());
266 data_received_during_two_phase_read_.clear();
267 }
268 }
269 255
270 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 256 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
271 if (!new_state.equals(old_state)) 257 if (!new_state.equals(old_state))
272 awakable_list_.AwakeForStateChange(new_state); 258 awakable_list_.AwakeForStateChange(new_state);
273 259
274 return rv; 260 // Deal with state changes due to peer being closed in OnError.
261 if (!data_pipe_->NotifyRead(num_bytes_read))
262 peer_closed_ = true;
263
264 return MOJO_RESULT_OK;
275 } 265 }
276 266
277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() 267 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
278 const { 268 const {
279 lock().AssertAcquired(); 269 lock().AssertAcquired();
280 270
281 HandleSignalsState rv; 271 HandleSignalsState rv;
282 if (!data_.empty()) { 272 if (data_pipe_->GetReadableBytes()) {
283 if (!in_two_phase_read_) 273 if (!in_two_phase_read_)
284 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; 274 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
286 } else if (!error_) { 276 }
277
278 if (peer_closed_) {
279 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
280 } else {
281 // We could become readable in the future.
287 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 282 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
288 } 283 }
289 284
290 if (error_)
291 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
292 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
293 return rv; 286 return rv;
294 } 287 }
295 288
296 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( 289 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
297 Awakable* awakable, 290 Awakable* awakable,
298 MojoHandleSignals signals, 291 MojoHandleSignals signals,
299 uintptr_t context, 292 uintptr_t context,
300 HandleSignalsState* signals_state) { 293 HandleSignalsState* signals_state) {
301 lock().AssertAcquired(); 294 lock().AssertAcquired();
302 if (channel_)
303 channel_->EnsureLazyInitialized();
304 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 295 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
305 if (state.satisfies(signals)) { 296 if (state.satisfies(signals)) {
306 if (signals_state) 297 if (signals_state)
307 *signals_state = state; 298 *signals_state = state;
308 return MOJO_RESULT_ALREADY_EXISTS; 299 return MOJO_RESULT_ALREADY_EXISTS;
309 } 300 }
310 if (!state.can_satisfy(signals)) { 301 if (!state.can_satisfy(signals)) {
311 if (signals_state) 302 if (signals_state)
312 *signals_state = state; 303 *signals_state = state;
313 return MOJO_RESULT_FAILED_PRECONDITION; 304 return MOJO_RESULT_FAILED_PRECONDITION;
314 } 305 }
315 306
316 awakable_list_.Add(awakable, signals, context); 307 awakable_list_.Add(awakable, signals, context);
317 return MOJO_RESULT_OK; 308 return MOJO_RESULT_OK;
318 } 309 }
319 310
320 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( 311 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
321 Awakable* awakable, 312 Awakable* awakable,
322 HandleSignalsState* signals_state) { 313 HandleSignalsState* signals_state) {
323 lock().AssertAcquired(); 314 lock().AssertAcquired();
324 awakable_list_.Remove(awakable); 315 awakable_list_.Remove(awakable);
325 if (signals_state) 316 if (signals_state)
326 *signals_state = GetHandleSignalsStateImplNoLock(); 317 *signals_state = GetHandleSignalsStateImplNoLock();
327 } 318 }
328 319
329 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( 320 void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
330 size_t* max_size, 321 size_t* max_size,
331 size_t* max_platform_handles) { 322 size_t* max_platform_handles) {
332 if (!serialized_) { 323 data_pipe_->StartSerialize(max_size, max_platform_handles);
333 // Handles the case where we have messages read off RawChannel but not ready
334 // by MojoReadMessage.
335 SerializeInternal();
336 }
337
338 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
339 !data_.empty() || !serialized_read_buffer_.empty(),
340 max_size, max_platform_handles);
341 } 324 }
342 325
343 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( 326 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
344 void* destination, 327 void* destination,
345 size_t* actual_size, 328 size_t* actual_size,
346 PlatformHandleVector* platform_handles) { 329 PlatformHandleVector* platform_handles) {
347 ScopedPlatformHandle shared_memory_handle; 330 data_pipe_->EndSerialize(destination, actual_size, platform_handles);
348 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
349 if (shared_memory_size) {
350 shared_memory_size += sizeof(SharedMemoryHeader);
351 SharedMemoryHeader header;
352 header.data_size = static_cast<uint32_t>(data_.size());
353 header.read_buffer_size =
354 static_cast<uint32_t>(serialized_read_buffer_.size());
355
356 scoped_refptr<PlatformSharedBuffer> shared_buffer(
357 internal::g_platform_support->CreateSharedBuffer(
358 shared_memory_size));
359 scoped_ptr<PlatformSharedBufferMapping> mapping(
360 shared_buffer->Map(0, shared_memory_size));
361
362 char* start = static_cast<char*>(mapping->GetBase());
363 memcpy(start, &header, sizeof(SharedMemoryHeader));
364 start += sizeof(SharedMemoryHeader);
365
366 if (!data_.empty()) {
367 memcpy(start, &data_[0], data_.size());
368 start += data_.size();
369 }
370
371 if (!serialized_read_buffer_.empty()) {
372 memcpy(start, &serialized_read_buffer_[0],
373 serialized_read_buffer_.size());
374 start += serialized_read_buffer_.size();
375 }
376
377 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
378 }
379
380 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
381 std::move(shared_memory_handle), shared_memory_size,
382 destination, actual_size, platform_handles);
383 CloseImplNoLock(); 331 CloseImplNoLock();
384 return true; 332 return true;
385 } 333 }
386 334
387 void DataPipeConsumerDispatcher::TransportStarted() { 335 void DataPipeConsumerDispatcher::TransportStarted() {
388 started_transport_.Acquire(); 336 started_transport_.Acquire();
389 } 337 }
390 338
391 void DataPipeConsumerDispatcher::TransportEnded() { 339 void DataPipeConsumerDispatcher::TransportEnded() {
392 started_transport_.Release(); 340 started_transport_.Release();
393 341
394 base::AutoLock locker(lock()); 342 base::AutoLock locker(lock());
395 343
396 // If transporting of DP failed, we might have got more data and didn't awake 344 // If transporting of DP failed, we might have got more data and didn't awake
397 // for. 345 // for.
398 // TODO(jam): should we care about only alerting if it was empty before 346 // TODO(jam): should we care about only alerting if it was empty before
399 // TransportStarted? 347 // TransportStarted?
400 if (!data_.empty()) 348 if (data_pipe_->GetReadableBytes())
401 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 349 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
402 } 350 }
403 351
404 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { 352 bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
405 lock().AssertAcquired(); 353 lock().AssertAcquired();
406 return in_two_phase_read_; 354 return in_two_phase_read_;
407 } 355 }
408 356
357 bool DataPipeConsumerDispatcher::ProcessCommand(
358 const DataPipeCommandHeader& command,
359 ScopedPlatformHandleVectorPtr platform_handles) {
360 // Handles write/read case and shared buffer becoming available case.
361 return data_pipe_->ProcessCommand(command, std::move(platform_handles));
362 }
363
409 void DataPipeConsumerDispatcher::OnReadMessage( 364 void DataPipeConsumerDispatcher::OnReadMessage(
410 const MessageInTransit::View& message_view, 365 const MessageInTransit::View& message_view,
411 ScopedPlatformHandleVectorPtr platform_handles) { 366 ScopedPlatformHandleVectorPtr platform_handles) {
412 const char* bytes_start = static_cast<const char*>(message_view.bytes()); 367 const DataPipeCommandHeader* command =
413 const char* bytes_end = bytes_start + message_view.num_bytes(); 368 static_cast<const DataPipeCommandHeader*>(message_view.bytes());
369 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
370
414 if (started_transport_.Try()) { 371 if (started_transport_.Try()) {
415 // We're not in the middle of being sent. 372 // We're not in the middle of being sent.
416 373
417 // Can get synchronously called back in Init if there was initial data. 374 // Can get synchronously called back from RawChannel::Init in InitOnIO if
375 // there was initial data. InitOnIO locks, so don't lock twice.
418 scoped_ptr<base::AutoLock> locker; 376 scoped_ptr<base::AutoLock> locker;
419 if (!calling_init_) { 377 if (!calling_init_) {
420 locker.reset(new base::AutoLock(lock())); 378 locker.reset(new base::AutoLock(lock()));
421 } 379 }
422 380
423 if (in_two_phase_read_) { 381 if (ProcessCommand(*command, std::move(platform_handles))) {
424 data_received_during_two_phase_read_.insert( 382 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
425 data_received_during_two_phase_read_.end(), bytes_start, bytes_end);
426 } else {
427 bool was_empty = data_.empty();
428 data_.insert(data_.end(), bytes_start, bytes_end);
429 if (was_empty)
430 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
431 } 383 }
432 started_transport_.Release(); 384 started_transport_.Release();
433 } else { 385 } else {
434 // See comment in MessagePipeDispatcher about why we can't and don't need 386 // DataPipe::Serialize calls ReleaseHandle on the channel, which
435 // to lock here. 387 // acquires RawChannel's read_lock_. The function OnReadMessage is only
436 data_.insert(data_.end(), bytes_start, bytes_end); 388 // called while read_lock_ is acquired, and not after ReleaseHandle has been
389 // called. This means this function will only be called before Serialize
390 // calls ReleaseHandle, meaning the serialisation will not have started yet.
391 // We only notify awakables if we're not in the process of being
392 // transported.
393 ProcessCommand(*command, std::move(platform_handles));
437 } 394 }
438 } 395 }
439 396
440 void DataPipeConsumerDispatcher::OnError(Error error) { 397 void DataPipeConsumerDispatcher::OnError(Error error) {
441 switch (error) { 398 switch (error) {
442 case ERROR_READ_SHUTDOWN: 399 case ERROR_READ_SHUTDOWN:
443 // The other side was cleanly closed, so this isn't actually an error. 400 // The other side was cleanly closed, so this isn't actually an error.
444 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; 401 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
445 break; 402 break;
446 case ERROR_READ_BROKEN: 403 case ERROR_READ_BROKEN:
447 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; 404 // It's okay for the other side to close the connection without reading
405 // our updates about how much we've read.
406 DLOG(ERROR)
407 << "DataPipeConsumerDispatcher read error (connection broken)";
448 break; 408 break;
449 case ERROR_READ_BAD_MESSAGE: 409 case ERROR_READ_BAD_MESSAGE:
450 // Receiving a bad message means either a bug, data corruption, or 410 // Receiving a bad message means either a bug, data corruption, or
451 // malicious attack (probably due to some other bug). 411 // malicious attack (probably due to some other bug).
452 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " 412 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad "
453 << "message)"; 413 << "message)";
454 break; 414 break;
455 case ERROR_READ_UNKNOWN: 415 case ERROR_READ_UNKNOWN:
456 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; 416 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
457 break; 417 break;
458 case ERROR_WRITE: 418 case ERROR_WRITE:
459 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; 419 LOG(ERROR) << "DataPipeConsumerDispatcher write error";
460 break; 420 break;
461 } 421 }
462 422
463 error_ = true; 423 peer_closed_ = true;
464 if (started_transport_.Try()) { 424 if (started_transport_.Try()) {
465 base::AutoLock locker(lock()); 425 base::AutoLock locker(lock());
466 // We can get two OnError callbacks before the post task below completes. 426 // We can get two OnError callbacks before the post task below completes.
467 // Although RawChannel still has a pointer to this object until Shutdown is 427 // Although RawChannel still has a pointer to this object until Shutdown is
468 // called, that is safe since this class always does a PostTask to the IO 428 // called, that is safe since this class always does a PostTask to the IO
469 // thread to self destruct. 429 // thread to self destruct.
470 if (channel_) { 430 if (data_pipe_->GetChannel()) {
471 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 431 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
472 channel_->Shutdown(); 432 data_pipe_->Shutdown();
473 channel_ = nullptr;
474 } 433 }
475 started_transport_.Release(); 434 started_transport_.Release();
476 } else { 435 } else {
477 // We must be waiting to call ReleaseHandle. It will call Shutdown. 436 // We must be waiting to call ReleaseHandle. It will call Shutdown.
478 } 437 }
479 } 438 }
480 439
481 void DataPipeConsumerDispatcher::SerializeInternal() {
482 DCHECK(!in_two_phase_read_);
483 // We need to stop watching handle immediately, even though not on IO thread,
484 // so that other messages aren't read after this.
485 if (channel_) {
486 std::vector<char> serialized_write_buffer;
487 std::vector<int> fds;
488 bool write_error = false;
489 serialized_platform_handle_ = channel_->ReleaseHandle(
490 &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds,
491 &write_error);
492 CHECK(serialized_write_buffer.empty());
493 CHECK(fds.empty());
494 CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write.";
495
496 channel_ = nullptr;
497 }
498
499 serialized_ = true;
500 }
501
502 } // namespace edk 440 } // namespace edk
503 } // namespace mojo 441 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698