Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Side by Side Diff: mojo/edk/system/data_pipe.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe.h" 5 #include "mojo/edk/system/data_pipe.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 #include <string.h> 9 #include <string.h>
10 10
11 #include <algorithm> 11 #include <algorithm>
12 #include <limits> 12 #include <limits>
13 13
14 #include "base/bind.h"
15 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_support.h"
17 #include "mojo/edk/system/broker.h"
18
14 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/configuration.h"
15 #include "mojo/edk/system/options_validation.h" 20 #include "mojo/edk/system/options_validation.h"
16 #include "mojo/edk/system/raw_channel.h" 21 #include "mojo/edk/system/raw_channel.h"
17 #include "mojo/edk/system/transport_data.h" 22 #include "mojo/edk/system/transport_data.h"
18 23
19 namespace mojo { 24 namespace mojo {
20 namespace edk { 25 namespace edk {
21 26
22 namespace { 27 namespace {
23 28
24 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); 29 const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1);
25 30
26 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { 31 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher {
27 uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) 32 uint32_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
28 33
29 // These are from MojoCreateDataPipeOptions 34 // These are from MojoCreateDataPipeOptions
30 MojoCreateDataPipeOptionsFlags flags; 35 MojoCreateDataPipeOptionsFlags flags;
31 uint32_t element_num_bytes; 36 uint32_t element_num_bytes;
32 uint32_t capacity_num_bytes; 37 uint32_t capacity_num_bytes;
33 38
34 uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) 39 uint32_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
35 uint32_t shared_memory_size; 40 uint32_t channel_pending_read_size;
41 uint32_t channel_pending_write_size;
42
43 uint32_t shared_buffer_handle_index;
44 uint32_t ring_buffer_start;
45 uint32_t ring_buffer_size;
36 }; 46 };
37 47
38 } // namespace 48 } // namespace
39 49
40 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { 50 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() {
41 MojoCreateDataPipeOptions result = { 51 MojoCreateDataPipeOptions result = {
42 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 52 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
43 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 53 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
44 1u, 54 1u,
45 static_cast<uint32_t>( 55 static_cast<uint32_t>(
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
93 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) 103 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
94 return MOJO_RESULT_INVALID_ARGUMENT; 104 return MOJO_RESULT_INVALID_ARGUMENT;
95 if (reader.options().capacity_num_bytes > 105 if (reader.options().capacity_num_bytes >
96 GetConfiguration().max_data_pipe_capacity_bytes) 106 GetConfiguration().max_data_pipe_capacity_bytes)
97 return MOJO_RESULT_RESOURCE_EXHAUSTED; 107 return MOJO_RESULT_RESOURCE_EXHAUSTED;
98 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; 108 out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
99 109
100 return MOJO_RESULT_OK; 110 return MOJO_RESULT_OK;
101 } 111 }
102 112
103 void DataPipe::StartSerialize(bool have_channel_handle, 113 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options)
104 bool have_shared_memory, 114 : channel_(nullptr),
105 size_t* max_size, 115 options_(options),
106 size_t* max_platform_handles) { 116 channel_released_(false),
117 shared_buffer_(nullptr),
118 mapping_(nullptr),
119 ring_buffer_start_(0u),
120 ring_buffer_size_(0u) {}
121
122 DataPipe::~DataPipe() {}
123
124 void DataPipe::Init(ScopedPlatformHandle message_pipe,
Anand Mistry (off Chromium) 2016/01/11 06:19:34 s/message_pipe/channel where appropriate.
Eliot Courtney 2016/01/13 00:00:09 Done.
125 char* serialized_write_buffer,
126 uint32_t serialized_write_buffer_size,
127 char* serialized_read_buffer,
128 uint32_t serialized_read_buffer_size,
129 ScopedPlatformHandle shared_buffer_handle,
130 uint32_t ring_buffer_start,
131 uint32_t ring_buffer_size,
132 bool is_producer,
133 const base::Closure& init_callback) {
134 is_producer_ = is_producer;
135
136 ring_buffer_start_ = ring_buffer_start;
137 ring_buffer_size_ = ring_buffer_size;
138
139 if (shared_buffer_handle.is_valid()) {
140 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle(
141 options_.capacity_num_bytes, std::move(shared_buffer_handle));
142 DCHECK(shared_buffer_);
143 } else if (is_producer_) {
144 #if defined(OS_WIN)
145 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer(
146 options_.capacity_num_bytes);
147 #else
148 shared_buffer_ =
149 internal::g_broker->CreateSharedBuffer(options_.capacity_num_bytes);
150 #endif
151 CHECK(shared_buffer_);
152 }
153
154 if (message_pipe.is_valid()) {
155 channel_ = RawChannel::Create(std::move(message_pipe));
156 channel_->SetSerializedData(serialized_read_buffer,
157 serialized_read_buffer_size,
158 serialized_write_buffer,
159 serialized_write_buffer_size, nullptr, nullptr);
160 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback);
161 channel_->EnsureLazyInitialized();
162
163 if (is_producer_) {
164 internal::g_io_thread_task_runner->PostTask(
165 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this));
166 }
167 }
168 }
169
170 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) {
171 // We need to release the channel and get its pending reads/writes.
172 if (!channel_released_) {
173 Serialize();
174 }
175
107 *max_size = sizeof(SerializedDataPipeHandleDispatcher); 176 *max_size = sizeof(SerializedDataPipeHandleDispatcher);
108 *max_platform_handles = 0; 177 *max_platform_handles = 0;
109 if (have_channel_handle) 178 if (serialized_channel_handle_.is_valid())
110 (*max_platform_handles)++; 179 (*max_platform_handles)++;
111 if (have_shared_memory) 180 // For shared buffer to transfer pending reads / writes.
181 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty())
182 (*max_platform_handles)++;
183 if (shared_buffer_)
112 (*max_platform_handles)++; 184 (*max_platform_handles)++;
113 } 185 }
114 186
115 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, 187 void DataPipe::Serialize() {
116 ScopedPlatformHandle channel_handle, 188 // We need to stop watching handle immediately, even though not on IO thread,
117 ScopedPlatformHandle shared_memory_handle, 189 // so that other messages aren't read after this.
118 size_t shared_memory_size, 190 if (channel_) {
119 void* destination, 191 std::vector<int> fds;
192 bool write_error = false;
193 serialized_channel_handle_ = channel_->ReleaseHandle(
Anand Mistry (off Chromium) 2016/01/11 06:19:34 From what I can tell, this is racy. Basically, thi
Eliot Courtney 2016/01/14 02:17:57 Discussed offline -- this is fairly impossible to
194 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds,
195 &write_error);
196
197 CHECK(fds.empty());
198 if (write_error) {
199 serialized_channel_handle_.reset();
200 }
201 channel_ = nullptr;
202 }
203 channel_released_ = true;
204 }
205
206 void DataPipe::EndSerialize(void* destination,
120 size_t* actual_size, 207 size_t* actual_size,
121 PlatformHandleVector* platform_handles) { 208 PlatformHandleVector* platform_handles) {
209 ScopedPlatformHandle channel_shared_handle;
210 size_t channel_shared_size =
211 serialized_read_buffer_.size() + serialized_write_buffer_.size();
212 if (channel_shared_size) {
213 #if defined(OS_WIN)
214 scoped_refptr<PlatformSharedBuffer> shared_buffer(
215 internal::g_platform_support->CreateSharedBuffer(channel_shared_size));
216 #else
217 scoped_refptr<PlatformSharedBuffer> shared_buffer(
218 internal::g_broker->CreateSharedBuffer(channel_shared_size));
219 #endif
220
221 scoped_ptr<PlatformSharedBufferMapping> mapping(
222 shared_buffer->Map(0, channel_shared_size));
223
224 void* base = mapping->GetBase();
225 if (!serialized_read_buffer_.empty()) {
226 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size());
227 }
228 if (!serialized_write_buffer_.empty()) {
229 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(),
230 &serialized_write_buffer_[0], serialized_write_buffer_.size());
231 }
232
233 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release());
234 }
235
122 SerializedDataPipeHandleDispatcher* serialization = 236 SerializedDataPipeHandleDispatcher* serialization =
123 static_cast<SerializedDataPipeHandleDispatcher*>(destination); 237 static_cast<SerializedDataPipeHandleDispatcher*>(destination);
124 if (channel_handle.is_valid()) { 238 if (serialized_channel_handle_.is_valid()) {
125 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); 239 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
126 serialization->platform_handle_index = 240 serialization->channel_handle_index =
127 static_cast<uint32_t>(platform_handles->size()); 241 static_cast<uint32_t>(platform_handles->size());
128 platform_handles->push_back(channel_handle.release()); 242 platform_handles->push_back(serialized_channel_handle_.release());
129 } else { 243 } else {
130 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; 244 serialization->channel_handle_index = kInvalidDataPipeHandleIndex;
131 } 245 }
132 246
133 serialization->flags = options.flags; 247 serialization->flags = options_.flags;
134 serialization->element_num_bytes = options.element_num_bytes; 248 serialization->element_num_bytes = options_.element_num_bytes;
135 serialization->capacity_num_bytes = options.capacity_num_bytes; 249 serialization->capacity_num_bytes = options_.capacity_num_bytes;
136 250
137 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); 251 serialization->channel_pending_read_size =
138 if (serialization->shared_memory_size) { 252 static_cast<uint32_t>(serialized_read_buffer_.size());
253 serialization->channel_pending_write_size =
254 static_cast<uint32_t>(serialized_write_buffer_.size());
255
256 if (channel_shared_handle.is_valid()) {
139 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); 257 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
140 serialization->shared_memory_handle_index = 258 serialization->channel_shared_handle_index =
141 static_cast<uint32_t>(platform_handles->size()); 259 static_cast<uint32_t>(platform_handles->size());
142 platform_handles->push_back(shared_memory_handle.release()); 260 platform_handles->push_back(channel_shared_handle.release());
143 } else { 261 } else {
144 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; 262 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex;
145 } 263 }
146 264
265 ScopedPlatformHandle shared_buffer_handle;
266 if (shared_buffer_) {
267 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release());
268 }
269
270 if (shared_buffer_handle.is_valid()) {
271 DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max());
272 serialization->shared_buffer_handle_index =
273 static_cast<uint32_t>(platform_handles->size());
274 platform_handles->push_back(shared_buffer_handle.release());
275 } else {
276 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex;
277 }
278
279 serialization->ring_buffer_start = ring_buffer_start_;
280 serialization->ring_buffer_size = ring_buffer_size_;
281
147 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); 282 *actual_size = sizeof(SerializedDataPipeHandleDispatcher);
148 } 283 }
149 284
150 ScopedPlatformHandle DataPipe::Deserialize( 285 ScopedPlatformHandle DataPipe::Deserialize(
151 const void* source, 286 const void* source,
152 size_t size, 287 size_t size,
153 PlatformHandleVector* platform_handles, 288 PlatformHandleVector* platform_handles,
154 MojoCreateDataPipeOptions* options, 289 MojoCreateDataPipeOptions* options,
155 ScopedPlatformHandle* shared_memory_handle, 290 ScopedPlatformHandle* channel_shared_handle,
156 size_t* shared_memory_size) { 291 uint32_t* serialized_read_buffer_size,
292 uint32_t* serialized_write_buffer_size,
293 ScopedPlatformHandle* shared_buffer_handle,
294 uint32_t* ring_buffer_start,
295 uint32_t* ring_buffer_size) {
157 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { 296 if (size != sizeof(SerializedDataPipeHandleDispatcher)) {
158 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; 297 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)";
159 return ScopedPlatformHandle(); 298 return ScopedPlatformHandle();
160 } 299 }
161 300
162 const SerializedDataPipeHandleDispatcher* serialization = 301 const SerializedDataPipeHandleDispatcher* serialization =
163 static_cast<const SerializedDataPipeHandleDispatcher*>(source); 302 static_cast<const SerializedDataPipeHandleDispatcher*>(source);
164 size_t platform_handle_index = serialization->platform_handle_index;
165 303
166 // Starts off invalid, which is what we want. 304 // Starts off invalid, which is what we want.
167 PlatformHandle platform_handle; 305 PlatformHandle channel_handle;
168 if (platform_handle_index != kInvalidDataPipeHandleIndex) { 306 uint32_t channel_handle_index = serialization->channel_handle_index;
169 if (!platform_handles || 307 if (channel_handle_index != kInvalidDataPipeHandleIndex) {
170 platform_handle_index >= platform_handles->size()) { 308 if (!platform_handles || channel_handle_index >= platform_handles->size()) {
171 LOG(ERROR) 309 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
172 << "Invalid serialized data pipe dispatcher (missing handles)";
173 return ScopedPlatformHandle(); 310 return ScopedPlatformHandle();
174 } 311 }
175 312
176 // We take ownership of the handle, so we have to invalidate the one in 313 // We take ownership of the handle, so we have to invalidate the one in
177 // |platform_handles|. 314 // |platform_handles|.
178 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); 315 std::swap(channel_handle, (*platform_handles)[channel_handle_index]);
179 } 316 }
180 317
181 options->struct_size = sizeof(MojoCreateDataPipeOptions); 318 options->struct_size = sizeof(MojoCreateDataPipeOptions);
182 options->flags = serialization->flags; 319 options->flags = serialization->flags;
183 options->element_num_bytes = serialization->element_num_bytes; 320 options->element_num_bytes = serialization->element_num_bytes;
184 options->capacity_num_bytes = serialization->capacity_num_bytes; 321 options->capacity_num_bytes = serialization->capacity_num_bytes;
185 322 *serialized_read_buffer_size = serialization->channel_pending_read_size;
186 if (shared_memory_size) { 323 *serialized_write_buffer_size = serialization->channel_pending_write_size;
187 *shared_memory_size = serialization->shared_memory_size; 324 *ring_buffer_start = serialization->ring_buffer_start;
188 if (*shared_memory_size) { 325 *ring_buffer_size = serialization->ring_buffer_size;
189 DCHECK(serialization->shared_memory_handle_index != 326
190 kInvalidDataPipeHandleIndex); 327 uint32_t channel_shared_handle_index =
191 if (!platform_handles || 328 serialization->channel_shared_handle_index;
192 serialization->shared_memory_handle_index >= 329 if (*serialized_read_buffer_size || *serialized_write_buffer_size) {
193 platform_handles->size()) { 330 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex);
194 LOG(ERROR) << "Invalid serialized data pipe dispatcher " 331 if (!platform_handles ||
195 << "(missing handles)"; 332 channel_shared_handle_index >= platform_handles->size()) {
196 return ScopedPlatformHandle(); 333 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
334 return ScopedPlatformHandle();
335 }
336
337 // Take ownership.
338 PlatformHandle temp_channel_shared_handle;
339 std::swap(temp_channel_shared_handle,
340 (*platform_handles)[channel_shared_handle_index]);
341 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle);
342 }
343
344 uint32_t shared_buffer_handle_index =
345 serialization->shared_buffer_handle_index;
346 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) {
347 if (!platform_handles ||
348 shared_buffer_handle_index >= platform_handles->size()) {
349 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
350 return ScopedPlatformHandle();
351 }
352
353 // Take ownership.
354 PlatformHandle temp_shared_buffer_handle;
355 std::swap(temp_shared_buffer_handle,
356 (*platform_handles)[shared_buffer_handle_index]);
357 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle);
358 }
359
360 return ScopedPlatformHandle(channel_handle);
361 }
362
363 uint8_t* DataPipe::GetSharedBufferBase() {
364 if (!mapping_) {
365 DCHECK(shared_buffer_);
366 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes);
367 }
368 if (!mapping_)
369 return nullptr;
370 return static_cast<uint8_t*>(mapping_->GetBase());
371 }
372
373 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements,
374 uint32_t num_bytes) {
375 uint32_t buffer_size = options_.capacity_num_bytes;
376 uint8_t* base = GetSharedBufferBase();
377 if (!base)
378 return false;
379
380 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
381 DCHECK_LE(num_bytes, buffer_size - ring_buffer_size_);
382
383 uint32_t ring_buffer_end =
384 (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
385 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
386 uint32_t bytes_until_end = buffer_size - ring_buffer_end;
387 memcpy(base + ring_buffer_end, elements,
388 std::min(num_bytes, bytes_until_end));
389
390 if (bytes_until_end < num_bytes) {
391 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end,
392 num_bytes - bytes_until_end);
393 }
394 } else {
395 memcpy(base + ring_buffer_end, elements, num_bytes);
396 }
397
398 return true;
399 }
400
401 bool DataPipe::ReadDataFromSharedBuffer(void* data, uint32_t num_bytes) {
402 DCHECK_LE(num_bytes, ring_buffer_size_);
403 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
404
405 uint32_t buffer_size = options_.capacity_num_bytes;
406 uint8_t* base = GetSharedBufferBase();
407 if (!base)
408 return false;
409
410 uint32_t ring_buffer_end =
411 (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
412 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
413 memcpy(data, base + ring_buffer_start_, num_bytes);
414 } else {
415 uint32_t bytes_until_end = buffer_size - ring_buffer_start_;
416 memcpy(data, base + ring_buffer_start_,
417 std::min(num_bytes, bytes_until_end));
418
419 if (bytes_until_end < num_bytes) {
420 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base,
421 num_bytes - bytes_until_end);
422 }
423 }
424
425 return true;
426 }
427
428 uint32_t DataPipe::GetReadableBytes() const {
429 return shared_buffer_ ? ring_buffer_size_ : 0;
430 }
431
432 uint32_t DataPipe::GetWritableBytes() const {
433 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0;
434 }
435
436 void DataPipe::UpdateFromRead(uint32_t num_bytes) {
437 ring_buffer_start_ =
438 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes;
439 ring_buffer_size_ -= num_bytes;
440 DCHECK_GE(ring_buffer_size_, 0u);
441 }
442
443 void DataPipe::UpdateFromWrite(uint32_t num_bytes) {
444 ring_buffer_size_ += num_bytes;
445 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes);
446 }
447
448 bool DataPipe::NotifyWrite(uint32_t num_bytes) {
449 DCHECK(is_producer_);
450
451 if (!channel_)
452 return false;
453
454 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes};
455 scoped_ptr<MessageInTransit> message(new MessageInTransit(
456 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
457 if (!channel_->WriteMessage(std::move(message))) {
458 return false;
459 }
460 return true;
461 }
462
463 bool DataPipe::NotifyRead(uint32_t num_bytes) {
464 DCHECK(!is_producer_);
465
466 if (!channel_)
467 return false;
468 DataPipeCommandHeader command = {DATA_READ, num_bytes};
469 scoped_ptr<MessageInTransit> message(new MessageInTransit(
470 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
471 if (!channel_->WriteMessage(std::move(message))) {
472 return false;
473 }
474 return true;
475 }
476
477 void DataPipe::NotifySharedBuffer() {
478 if (!channel_) {
479 DLOG(ERROR) << "NotifySharedBuffer: No channel";
480 return;
481 }
482 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER,
483 options_.capacity_num_bytes};
484 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle();
485 DCHECK(handle.is_valid());
486 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector());
487 fds->push_back(handle.release());
488
489 scoped_ptr<MessageInTransit> message(new MessageInTransit(
490 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
491 message->SetTransportData(make_scoped_ptr(new TransportData(
492 std::move(fds), channel_->GetSerializedPlatformHandleSize())));
493 if (!channel_->WriteMessage(std::move(message))) {
494 DLOG(ERROR) << "NotifySharedBuffer: Can't write";
495 }
496 }
497
498 void DataPipe::UpdateSharedBuffer(
499 scoped_refptr<PlatformSharedBuffer> shared_buffer) {
500 DCHECK(shared_buffer);
501 shared_buffer_ = shared_buffer;
502 mapping_ = nullptr;
503 }
504
505 void DataPipe::Shutdown() {
506 if (channel_) {
507 channel_->Shutdown();
508 channel_ = nullptr;
509 }
510 }
511
512 void DataPipe::CreateEquivalentAndClose(DataPipe* out) {
513 // Serialize, releasing the handle, otherwise we will get callbacks to the
514 // wrong delegate for RawChannel.
515 Serialize();
516
517 std::swap(options_, out->options_);
518 std::swap(channel_released_, out->channel_released_);
519 serialized_read_buffer_.swap(out->serialized_read_buffer_);
520 serialized_write_buffer_.swap(out->serialized_write_buffer_);
521 serialized_channel_handle_.swap(out->serialized_channel_handle_);
522 shared_buffer_.swap(out->shared_buffer_);
523 mapping_.swap(out->mapping_);
524 std::swap(ring_buffer_start_, out->ring_buffer_start_);
525 std::swap(ring_buffer_size_, out->ring_buffer_size_);
526 std::swap(is_producer_, out->is_producer_);
527 }
528
529 void* DataPipe::GetWriteBuffer(uint32_t* num_bytes) {
530 if (!shared_buffer_) {
531 *num_bytes = 0;
532 return nullptr;
533 }
534 // Must provide sequential memory.
535 uint32_t buffer_size = options_.capacity_num_bytes;
536 uint32_t ring_buffer_end =
537 (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
538 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
539 *num_bytes = buffer_size - ring_buffer_end;
540 } else {
541 *num_bytes = ring_buffer_start_ - ring_buffer_end;
542 }
543
544 return GetSharedBufferBase() + ring_buffer_end;
545 }
546
547 const void* DataPipe::GetReadBuffer(uint32_t* num_bytes) {
548 if (!shared_buffer_) {
549 *num_bytes = 0;
550 return nullptr;
551 }
552 // Must provide sequential memory.
553 uint32_t buffer_size = options_.capacity_num_bytes;
554 uint32_t ring_buffer_end =
555 (ring_buffer_start_ + ring_buffer_size_) % buffer_size;
556 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
557 *num_bytes = ring_buffer_size_;
558 } else {
559 *num_bytes = buffer_size - ring_buffer_start_;
560 }
561
562 return GetSharedBufferBase() + ring_buffer_start_;
563 }
564
565 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command,
566 ScopedPlatformHandleVectorPtr platform_handles) {
567 PlatformHandle handle;
568 ScopedPlatformHandle scoped_handle;
569 scoped_refptr<PlatformSharedBuffer> shared_buffer;
570 bool was_unreadable = GetReadableBytes() == 0;
571 bool was_unwritable = GetWritableBytes() == 0;
572 switch (command.command) {
573 case NOTIFY_SHARED_BUFFER:
574 CHECK_EQ(platform_handles->size(), 1u);
575 std::swap(handle, (*platform_handles.get())[0]);
576 scoped_handle.reset(handle);
577 UpdateSharedBuffer(
578 internal::g_platform_support->CreateSharedBufferFromHandle(
579 command.num_bytes, std::move(scoped_handle)));
580 break;
581 case DATA_WRITTEN:
582 if (!is_producer_) {
583 UpdateFromWrite(command.num_bytes);
584 } else {
585 LOG(ERROR) << "Producer was told of a write, which shouldn't happen.";
586 DCHECK(0);
Anand Mistry (off Chromium) 2016/01/11 06:19:34 NOTREACHED();
Eliot Courtney 2016/01/13 00:00:09 Done.
197 } 587 }
198 588 break;
199 PlatformHandle temp_shared_memory_handle; 589 case DATA_READ:
200 std::swap(temp_shared_memory_handle, 590 if (is_producer_) {
201 (*platform_handles)[serialization->shared_memory_handle_index]); 591 UpdateFromRead(command.num_bytes);
202 *shared_memory_handle = 592 } else {
203 ScopedPlatformHandle(temp_shared_memory_handle); 593 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen.";
204 } 594 DCHECK(0);
205 } 595 }
206 596 break;
207 size -= sizeof(SerializedDataPipeHandleDispatcher); 597 default:
208 598 LOG(ERROR) << "Shouldn't happen";
209 return ScopedPlatformHandle(platform_handle); 599 DCHECK(0);
600 }
601
602 // Handles write/read case and shared buffer becoming available case.
603 return (was_unreadable && GetReadableBytes()) ||
604 (was_unwritable && GetWritableBytes());
210 } 605 }
211 606
212 } // namespace edk 607 } // namespace edk
213 } // namespace mojo 608 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698