Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(42)

Side by Side Diff: mojo/edk/system/data_pipe.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe.h" 5 #include "mojo/edk/system/data_pipe.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 #include <string.h> 9 #include <string.h>
10 10
11 #include <algorithm>
12
13 #include "base/bind.h"
14 #include "mojo/edk/embedder/embedder_internal.h"
15 #include "mojo/edk/embedder/platform_support.h"
11 #include "mojo/edk/system/configuration.h" 16 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/options_validation.h" 17 #include "mojo/edk/system/options_validation.h"
13 #include "mojo/edk/system/raw_channel.h" 18 #include "mojo/edk/system/raw_channel.h"
14 #include "mojo/edk/system/transport_data.h" 19 #include "mojo/edk/system/transport_data.h"
15 20
16 namespace mojo { 21 namespace mojo {
17 namespace edk { 22 namespace edk {
18 23
19 namespace { 24 namespace {
20 25
21 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); 26 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1);
22 27
23 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { 28 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher {
24 size_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) 29 size_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
25 30
26 // These are from MojoCreateDataPipeOptions 31 // These are from MojoCreateDataPipeOptions
27 MojoCreateDataPipeOptionsFlags flags; 32 MojoCreateDataPipeOptionsFlags flags;
28 uint32_t element_num_bytes; 33 uint32_t element_num_bytes;
29 uint32_t capacity_num_bytes; 34 uint32_t capacity_num_bytes;
30 35
31 size_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) 36 size_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.)
32 uint32_t shared_memory_size; 37 uint32_t channel_pending_read_size;
38 uint32_t channel_pending_write_size;
39
40 size_t shared_buffer_handle_index;
41 size_t ring_buffer_start;
42 size_t ring_buffer_size;
33 }; 43 };
34 44
35 } // namespace 45 } // namespace
36 46
37 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { 47 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() {
38 MojoCreateDataPipeOptions result = { 48 MojoCreateDataPipeOptions result = {
39 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 49 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
40 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 50 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
41 1u, 51 1u,
42 static_cast<uint32_t>( 52 static_cast<uint32_t>(
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) 100 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
91 return MOJO_RESULT_INVALID_ARGUMENT; 101 return MOJO_RESULT_INVALID_ARGUMENT;
92 if (reader.options().capacity_num_bytes > 102 if (reader.options().capacity_num_bytes >
93 GetConfiguration().max_data_pipe_capacity_bytes) 103 GetConfiguration().max_data_pipe_capacity_bytes)
94 return MOJO_RESULT_RESOURCE_EXHAUSTED; 104 return MOJO_RESULT_RESOURCE_EXHAUSTED;
95 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; 105 out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
96 106
97 return MOJO_RESULT_OK; 107 return MOJO_RESULT_OK;
98 } 108 }
99 109
100 void DataPipe::StartSerialize(bool have_channel_handle, 110 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options)
101 bool have_shared_memory, 111 : channel_(nullptr),
102 size_t* max_size, 112 options_(options),
103 size_t* max_platform_handles) { 113 channel_released_(false),
114 shared_buffer_(nullptr),
115 mapping_(nullptr),
116 ring_buffer_start_(0),
117 ring_buffer_size_(0) {}
118
119 DataPipe::~DataPipe() {}
120
121 void DataPipe::Init(ScopedPlatformHandle message_pipe,
122 char* serialized_write_buffer,
123 size_t serialized_write_buffer_size,
124 char* serialized_read_buffer,
125 size_t serialized_read_buffer_size,
126 ScopedPlatformHandle shared_buffer_handle,
127 size_t ring_buffer_start,
128 size_t ring_buffer_size,
129 bool is_producer,
130 const base::Closure& init_callback) {
131 is_producer_ = is_producer;
132
133 ring_buffer_start_ = ring_buffer_start;
134 ring_buffer_size_ = ring_buffer_size;
135
136 if (shared_buffer_handle.is_valid()) {
137 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle(
138 options_.capacity_num_bytes, std::move(shared_buffer_handle));
139 DCHECK(shared_buffer_);
140 } else if (is_producer_) {
141 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer(
142 options_.capacity_num_bytes);
143 }
144
145 if (message_pipe.is_valid()) {
146 channel_ = RawChannel::Create(std::move(message_pipe));
147 channel_->SetSerializedData(serialized_read_buffer,
148 serialized_read_buffer_size,
149 serialized_write_buffer,
150 serialized_write_buffer_size, nullptr, nullptr);
151 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback);
152 channel_->EnsureLazyInitialized();
153
154 if (is_producer_) {
155 if (shared_buffer_) {
156 internal::g_io_thread_task_runner->PostTask(
157 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this));
158 } else {
159 internal::g_io_thread_task_runner->PostTask(
160 FROM_HERE, base::Bind(&DataPipe::RequestSharedBuffer, this));
161 }
162 }
163 }
164 }
165
166 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) {
167 // We need to release the channel and get its pending reads/writes.
168 if (!channel_released_) {
169 Serialize();
170 }
171
104 *max_size = sizeof(SerializedDataPipeHandleDispatcher); 172 *max_size = sizeof(SerializedDataPipeHandleDispatcher);
105 *max_platform_handles = 0; 173 *max_platform_handles = 0;
106 if (have_channel_handle) 174 if (serialized_channel_handle_.is_valid())
107 (*max_platform_handles)++; 175 (*max_platform_handles)++;
108 if (have_shared_memory) 176 // For shared buffer to transfer pending reads / writes.
177 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty())
178 (*max_platform_handles)++;
179 if (shared_buffer_)
109 (*max_platform_handles)++; 180 (*max_platform_handles)++;
110 } 181 }
111 182
112 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, 183 void DataPipe::Serialize() {
113 ScopedPlatformHandle channel_handle, 184 // We need to stop watching handle immediately, even though not on IO thread,
114 ScopedPlatformHandle shared_memory_handle, 185 // so that other messages aren't read after this.
115 size_t shared_memory_size, 186 if (channel_) {
116 void* destination, 187 std::vector<int> fds;
188 bool write_error = false;
189 serialized_channel_handle_ = channel_->ReleaseHandle(
190 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds,
191 &write_error);
192
193 CHECK(fds.empty());
194 if (write_error) {
195 serialized_channel_handle_.reset();
196 }
197 channel_ = nullptr;
198 }
199 channel_released_ = true;
200 }
201
202 void DataPipe::EndSerialize(void* destination,
117 size_t* actual_size, 203 size_t* actual_size,
118 PlatformHandleVector* platform_handles) { 204 PlatformHandleVector* platform_handles) {
205 ScopedPlatformHandle channel_shared_handle;
206 size_t channel_shared_size =
207 serialized_read_buffer_.size() + serialized_write_buffer_.size();
208 if (channel_shared_size) {
209 scoped_refptr<PlatformSharedBuffer> shared_buffer(
210 internal::g_platform_support->CreateSharedBuffer(channel_shared_size));
211 scoped_ptr<PlatformSharedBufferMapping> mapping(
212 shared_buffer->Map(0, channel_shared_size));
213
214 void* base = mapping->GetBase();
215 if (!serialized_read_buffer_.empty()) {
216 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size());
217 }
218 if (!serialized_write_buffer_.empty()) {
219 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(),
220 &serialized_write_buffer_[0], serialized_write_buffer_.size());
221 }
222
223 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release());
224 }
225
119 SerializedDataPipeHandleDispatcher* serialization = 226 SerializedDataPipeHandleDispatcher* serialization =
120 static_cast<SerializedDataPipeHandleDispatcher*>(destination); 227 static_cast<SerializedDataPipeHandleDispatcher*>(destination);
121 if (channel_handle.is_valid()) { 228 if (serialized_channel_handle_.is_valid()) {
122 serialization->platform_handle_index = platform_handles->size(); 229 serialization->channel_handle_index = platform_handles->size();
123 platform_handles->push_back(channel_handle.release()); 230 platform_handles->push_back(serialized_channel_handle_.release());
124 } else { 231 } else {
125 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; 232 serialization->channel_handle_index = kInvalidDataPipeHandleIndex;
126 } 233 }
127 234
128 serialization->flags = options.flags; 235 serialization->flags = options_.flags;
129 serialization->element_num_bytes = options.element_num_bytes; 236 serialization->element_num_bytes = options_.element_num_bytes;
130 serialization->capacity_num_bytes = options.capacity_num_bytes; 237 serialization->capacity_num_bytes = options_.capacity_num_bytes;
131 238
132 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); 239 serialization->channel_pending_read_size =
133 if (serialization->shared_memory_size) { 240 static_cast<uint32_t>(serialized_read_buffer_.size());
134 serialization->shared_memory_handle_index = platform_handles->size(); 241 serialization->channel_pending_write_size =
135 platform_handles->push_back(shared_memory_handle.release()); 242 static_cast<uint32_t>(serialized_write_buffer_.size());
243
244 if (channel_shared_handle.is_valid()) {
245 serialization->channel_shared_handle_index = platform_handles->size();
246 platform_handles->push_back(channel_shared_handle.release());
136 } else { 247 } else {
137 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; 248 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex;
138 } 249 }
139 250
251 ScopedPlatformHandle shared_buffer_handle;
252 if (shared_buffer_) {
253 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release());
254 }
255
256 if (shared_buffer_handle.is_valid()) {
257 serialization->shared_buffer_handle_index = platform_handles->size();
258 platform_handles->push_back(shared_buffer_handle.release());
259 } else {
260 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex;
261 }
262
263 serialization->ring_buffer_start = ring_buffer_start_;
264 serialization->ring_buffer_size = ring_buffer_size_;
265
140 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); 266 *actual_size = sizeof(SerializedDataPipeHandleDispatcher);
141 } 267 }
142 268
143 ScopedPlatformHandle DataPipe::Deserialize( 269 ScopedPlatformHandle DataPipe::Deserialize(
144 const void* source, 270 const void* source,
145 size_t size, 271 size_t size,
146 PlatformHandleVector* platform_handles, 272 PlatformHandleVector* platform_handles,
147 MojoCreateDataPipeOptions* options, 273 MojoCreateDataPipeOptions* options,
148 ScopedPlatformHandle* shared_memory_handle, 274 ScopedPlatformHandle* channel_shared_handle,
149 size_t* shared_memory_size) { 275 size_t* serialized_read_buffer_size,
276 size_t* serialized_write_buffer_size,
277 ScopedPlatformHandle* shared_buffer_handle,
278 size_t* ring_buffer_start,
279 size_t* ring_buffer_size) {
150 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { 280 if (size != sizeof(SerializedDataPipeHandleDispatcher)) {
151 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; 281 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)";
152 return ScopedPlatformHandle(); 282 return ScopedPlatformHandle();
153 } 283 }
154 284
155 const SerializedDataPipeHandleDispatcher* serialization = 285 const SerializedDataPipeHandleDispatcher* serialization =
156 static_cast<const SerializedDataPipeHandleDispatcher*>(source); 286 static_cast<const SerializedDataPipeHandleDispatcher*>(source);
157 size_t platform_handle_index = serialization->platform_handle_index;
158 287
159 // Starts off invalid, which is what we want. 288 // Starts off invalid, which is what we want.
160 PlatformHandle platform_handle; 289 PlatformHandle channel_handle;
161 if (platform_handle_index != kInvalidDataPipeHandleIndex) { 290 size_t channel_handle_index = serialization->channel_handle_index;
162 if (!platform_handles || 291 if (channel_handle_index != kInvalidDataPipeHandleIndex) {
163 platform_handle_index >= platform_handles->size()) { 292 if (!platform_handles || channel_handle_index >= platform_handles->size()) {
164 LOG(ERROR) 293 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
165 << "Invalid serialized data pipe dispatcher (missing handles)";
166 return ScopedPlatformHandle(); 294 return ScopedPlatformHandle();
167 } 295 }
168 296
169 // We take ownership of the handle, so we have to invalidate the one in 297 // We take ownership of the handle, so we have to invalidate the one in
170 // |platform_handles|. 298 // |platform_handles|.
171 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); 299 std::swap(channel_handle, (*platform_handles)[channel_handle_index]);
172 } 300 }
173 301
174 options->struct_size = sizeof(MojoCreateDataPipeOptions); 302 options->struct_size = sizeof(MojoCreateDataPipeOptions);
175 options->flags = serialization->flags; 303 options->flags = serialization->flags;
176 options->element_num_bytes = serialization->element_num_bytes; 304 options->element_num_bytes = serialization->element_num_bytes;
177 options->capacity_num_bytes = serialization->capacity_num_bytes; 305 options->capacity_num_bytes = serialization->capacity_num_bytes;
178 306 *serialized_read_buffer_size = serialization->channel_pending_read_size;
179 if (shared_memory_size) { 307 *serialized_write_buffer_size = serialization->channel_pending_write_size;
180 *shared_memory_size = serialization->shared_memory_size; 308 *ring_buffer_start = serialization->ring_buffer_start;
181 if (*shared_memory_size) { 309 *ring_buffer_size = serialization->ring_buffer_size;
182 DCHECK(serialization->shared_memory_handle_index != 310
183 kInvalidDataPipeHandleIndex); 311 size_t channel_shared_handle_index =
184 if (!platform_handles || 312 serialization->channel_shared_handle_index;
185 serialization->shared_memory_handle_index >= 313 if (*serialized_read_buffer_size || *serialized_write_buffer_size) {
186 platform_handles->size()) { 314 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex);
187 LOG(ERROR) << "Invalid serialized data pipe dispatcher " 315 if (!platform_handles ||
188 << "(missing handles)"; 316 channel_shared_handle_index >= platform_handles->size()) {
189 return ScopedPlatformHandle(); 317 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
318 return ScopedPlatformHandle();
319 }
320
321 // Take ownership.
322 PlatformHandle temp_channel_shared_handle;
323 std::swap(temp_channel_shared_handle,
324 (*platform_handles)[channel_shared_handle_index]);
325 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle);
326 }
327
328 size_t shared_buffer_handle_index = serialization->shared_buffer_handle_index;
329 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) {
330 if (!platform_handles ||
331 shared_buffer_handle_index >= platform_handles->size()) {
332 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)";
333 return ScopedPlatformHandle();
334 }
335
336 // Take ownership.
337 PlatformHandle temp_shared_buffer_handle;
338 std::swap(temp_shared_buffer_handle,
339 (*platform_handles)[shared_buffer_handle_index]);
340 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle);
341 }
342
343 return ScopedPlatformHandle(channel_handle);
344 }
345
346 void* DataPipe::GetSharedBufferBase() {
347 if (!mapping_) {
348 DCHECK(shared_buffer_);
349 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes);
350 }
351 if (!mapping_) {
352 return nullptr;
353 }
354 return mapping_->GetBase();
355 }
356
357 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements,
358 size_t num_bytes) {
359 size_t bufsz = options_.capacity_num_bytes;
Anand Mistry (off Chromium) 2016/01/08 03:28:02 This isn't Go. Don't abbreviate unnecessarily.
Eliot Courtney 2016/01/08 04:44:39 Done.
360 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase());
361 if (base == nullptr) {
Anand Mistry (off Chromium) 2016/01/08 03:28:02 generally prefer: if (!base)
Eliot Courtney 2016/01/08 04:44:39 Done.
362 return false;
363 }
364
365 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
366 DCHECK_LE(num_bytes, bufsz - ring_buffer_size_);
367
368 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz;
369 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
370 size_t bytes_until_end = bufsz - ring_buffer_end;
371 memcpy(base + ring_buffer_end, elements,
372 std::min(num_bytes, bytes_until_end));
373
374 if (bytes_until_end < num_bytes) {
375 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end,
376 num_bytes - bytes_until_end);
377 }
378 } else {
379 memcpy(base + ring_buffer_end, elements, num_bytes);
380 }
381
382 return true;
383 }
384
385 bool DataPipe::ReadDataFromSharedBuffer(void* data, size_t num_bytes) {
386 DCHECK_LE(num_bytes, ring_buffer_size_);
387 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u);
388
389 size_t bufsz = options_.capacity_num_bytes;
390 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase());
391 if (base == nullptr) {
392 return false;
393 }
394
395 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz;
396 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
397 memcpy(data, base + ring_buffer_start_, num_bytes);
398 } else {
399 size_t bytes_until_end = bufsz - ring_buffer_start_;
400 memcpy(data, base + ring_buffer_start_,
401 std::min(num_bytes, bytes_until_end));
402
403 if (bytes_until_end < num_bytes) {
404 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base,
405 num_bytes - bytes_until_end);
406 }
407 }
408
409 return true;
410 }
411
412 size_t DataPipe::GetReadableBytes() const {
413 return shared_buffer_ ? ring_buffer_size_ : 0;
414 }
415
416 size_t DataPipe::GetWritableBytes() const {
417 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0;
418 }
419
420 void DataPipe::UpdateFromRead(size_t num_bytes) {
421 ring_buffer_start_ =
422 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes;
423 ring_buffer_size_ -= num_bytes;
424 DCHECK_GE(ring_buffer_size_, 0u);
425 }
426
427 void DataPipe::UpdateFromWrite(size_t num_bytes) {
428 ring_buffer_size_ += num_bytes;
429 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes);
430 }
431
432 bool DataPipe::NotifyWrite(size_t num_bytes) {
433 DCHECK(is_producer_);
434
435 if (!channel_)
436 return false;
437
438 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes};
439 scoped_ptr<MessageInTransit> message(new MessageInTransit(
440 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
441 if (!channel_->WriteMessage(std::move(message))) {
442 return false;
443 }
444 return true;
445 }
446
447 bool DataPipe::NotifyRead(size_t num_bytes) {
448 DCHECK(!is_producer_);
449
450 if (!channel_)
451 return false;
452 DataPipeCommandHeader command = {DATA_READ, num_bytes};
453 scoped_ptr<MessageInTransit> message(new MessageInTransit(
454 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
455 if (!channel_->WriteMessage(std::move(message))) {
456 return false;
457 }
458 return true;
459 }
460
461 void DataPipe::NotifySharedBuffer() {
462 if (!channel_) {
463 DLOG(ERROR) << "NotifySharedBuffer: No channel";
464 return;
465 }
466 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER,
467 options_.capacity_num_bytes};
468 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle();
469 DCHECK(handle.is_valid());
470 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector());
471 fds->push_back(handle.release());
472
473 scoped_ptr<MessageInTransit> message(new MessageInTransit(
474 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
475 message->SetTransportData(make_scoped_ptr(new TransportData(
476 std::move(fds), channel_->GetSerializedPlatformHandleSize())));
477 if (!channel_->WriteMessage(std::move(message))) {
478 DLOG(ERROR) << "NotifySharedBuffer: Can't write";
479 }
480 }
481
482 void DataPipe::RequestSharedBuffer() {
483 if (!channel_) {
484 DLOG(ERROR) << "NotifySharedBuffer: No channel";
485 return;
486 }
487
488 DataPipeCommandHeader command = {PLEASE_CREATE_SHARED_BUFFER,
489 options_.capacity_num_bytes};
490
491 scoped_ptr<MessageInTransit> message(new MessageInTransit(
492 MessageInTransit::Type::MESSAGE, sizeof(command), &command));
493 if (!channel_->WriteMessage(std::move(message))) {
494 DLOG(ERROR) << "RequestSharedBuffer: Can't write";
495 }
496 }
497
498 void DataPipe::UpdateSharedBuffer(
499 scoped_refptr<PlatformSharedBuffer> shared_buffer) {
500 shared_buffer_ = shared_buffer;
501 mapping_ = nullptr;
502 }
503
504 void DataPipe::Shutdown() {
505 if (channel_) {
506 channel_->Shutdown();
507 channel_ = nullptr;
508 }
509 }
510
511 void DataPipe::CreateEquivalentAndClose(DataPipe* out) {
512 // Serialize, releasing the handle, otherwise we will get callbacks to the
513 // wrong delegate for RawChannel.
514 Serialize();
515
516 std::swap(options_, out->options_);
517 std::swap(channel_released_, out->channel_released_);
518 serialized_read_buffer_.swap(out->serialized_read_buffer_);
519 serialized_write_buffer_.swap(out->serialized_write_buffer_);
520 serialized_channel_handle_.swap(out->serialized_channel_handle_);
521 shared_buffer_.swap(out->shared_buffer_);
522 mapping_.swap(out->mapping_);
523 std::swap(ring_buffer_start_, out->ring_buffer_start_);
524 std::swap(ring_buffer_size_, out->ring_buffer_size_);
525 std::swap(is_producer_, out->is_producer_);
526 }
527
528 void* DataPipe::GetWriteBuffer(size_t* num_bytes) {
529 if (!shared_buffer_) {
530 *num_bytes = 0;
531 return nullptr;
532 }
533 // Must provide sequential memory.
534 size_t bufsz = options_.capacity_num_bytes;
535 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz;
536 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
537 *num_bytes = bufsz - ring_buffer_end;
538 } else {
539 *num_bytes = ring_buffer_start_ - ring_buffer_end;
540 }
541
542 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_end;
543 }
544
545 const void* DataPipe::GetReadBuffer(size_t* num_bytes) {
546 if (!shared_buffer_) {
547 *num_bytes = 0;
548 return nullptr;
549 }
550 // Must provide sequential memory.
551 size_t bufsz = options_.capacity_num_bytes;
552 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz;
553 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) {
554 *num_bytes = ring_buffer_size_;
555 } else {
556 *num_bytes = bufsz - ring_buffer_start_;
557 }
558
559 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_start_;
560 }
561
562 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command,
563 ScopedPlatformHandleVectorPtr platform_handles) {
564 PlatformHandle handle;
565 ScopedPlatformHandle scoped_handle;
566 scoped_refptr<PlatformSharedBuffer> shared_buffer;
567 bool was_unreadable = GetReadableBytes() == 0;
568 bool was_unwritable = GetWritableBytes() == 0;
569 switch (command.command) {
570 case PLEASE_CREATE_SHARED_BUFFER:
571 // Other end wasn't able to create the shared buffer.
572 shared_buffer =
573 internal::g_platform_support->CreateSharedBuffer(command.num_bytes);
574 CHECK(shared_buffer);
575 UpdateSharedBuffer(shared_buffer);
576
577 // Can't call back into RawChannel because of locking, so do this later.
578 internal::g_io_thread_task_runner->PostTask(
579 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this));
580 break;
581 case NOTIFY_SHARED_BUFFER:
582 CHECK_EQ(platform_handles->size(), 1u);
583 std::swap(handle, (*platform_handles.get())[0]);
584 scoped_handle.reset(handle);
585 UpdateSharedBuffer(
586 internal::g_platform_support->CreateSharedBufferFromHandle(
587 command.num_bytes, std::move(scoped_handle)));
588 break;
589 case DATA_WRITTEN:
590 if (!is_producer_) {
591 UpdateFromWrite(command.num_bytes);
592 } else {
593 LOG(ERROR) << "Producer was told of a write, which shouldn't happen.";
594 DCHECK(0);
190 } 595 }
191 596 break;
192 PlatformHandle temp_shared_memory_handle; 597 case DATA_READ:
193 std::swap(temp_shared_memory_handle, 598 if (is_producer_) {
194 (*platform_handles)[serialization->shared_memory_handle_index]); 599 UpdateFromRead(command.num_bytes);
195 *shared_memory_handle = 600 } else {
196 ScopedPlatformHandle(temp_shared_memory_handle); 601 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen.";
197 } 602 DCHECK(0);
198 } 603 }
199 604 break;
200 size -= sizeof(SerializedDataPipeHandleDispatcher); 605 default:
201 606 LOG(ERROR) << "Shouldn't happen";
202 return ScopedPlatformHandle(platform_handle); 607 DCHECK(0);
608 }
609
610 // Handles write/read case and shared buffer becoming available case.
611 return (was_unreadable && GetReadableBytes()) ||
612 (was_unwritable && GetWritableBytes());
203 } 613 }
204 614
205 } // namespace edk 615 } // namespace edk
206 } // namespace mojo 616 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698