OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe.h" | 5 #include "mojo/edk/system/data_pipe.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 #include <string.h> | 9 #include <string.h> |
10 | 10 |
11 #include <algorithm> | |
12 | |
13 #include "base/bind.h" | |
14 #include "mojo/edk/embedder/embedder_internal.h" | |
15 #include "mojo/edk/embedder/platform_support.h" | |
11 #include "mojo/edk/system/configuration.h" | 16 #include "mojo/edk/system/configuration.h" |
12 #include "mojo/edk/system/options_validation.h" | 17 #include "mojo/edk/system/options_validation.h" |
13 #include "mojo/edk/system/raw_channel.h" | 18 #include "mojo/edk/system/raw_channel.h" |
14 #include "mojo/edk/system/transport_data.h" | 19 #include "mojo/edk/system/transport_data.h" |
15 | 20 |
16 namespace mojo { | 21 namespace mojo { |
17 namespace edk { | 22 namespace edk { |
18 | 23 |
19 namespace { | 24 namespace { |
20 | 25 |
21 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); | 26 const size_t kInvalidDataPipeHandleIndex = static_cast<size_t>(-1); |
22 | 27 |
23 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { | 28 struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { |
24 size_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 29 size_t channel_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
25 | 30 |
26 // These are from MojoCreateDataPipeOptions | 31 // These are from MojoCreateDataPipeOptions |
27 MojoCreateDataPipeOptionsFlags flags; | 32 MojoCreateDataPipeOptionsFlags flags; |
28 uint32_t element_num_bytes; | 33 uint32_t element_num_bytes; |
29 uint32_t capacity_num_bytes; | 34 uint32_t capacity_num_bytes; |
30 | 35 |
31 size_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) | 36 size_t channel_shared_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) |
32 uint32_t shared_memory_size; | 37 uint32_t channel_pending_read_size; |
38 uint32_t channel_pending_write_size; | |
39 | |
40 size_t shared_buffer_handle_index; | |
41 size_t ring_buffer_start; | |
42 size_t ring_buffer_size; | |
33 }; | 43 }; |
34 | 44 |
35 } // namespace | 45 } // namespace |
36 | 46 |
37 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { | 47 MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { |
38 MojoCreateDataPipeOptions result = { | 48 MojoCreateDataPipeOptions result = { |
39 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), | 49 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
40 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, | 50 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
41 1u, | 51 1u, |
42 static_cast<uint32_t>( | 52 static_cast<uint32_t>( |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
90 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) | 100 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) |
91 return MOJO_RESULT_INVALID_ARGUMENT; | 101 return MOJO_RESULT_INVALID_ARGUMENT; |
92 if (reader.options().capacity_num_bytes > | 102 if (reader.options().capacity_num_bytes > |
93 GetConfiguration().max_data_pipe_capacity_bytes) | 103 GetConfiguration().max_data_pipe_capacity_bytes) |
94 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 104 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
95 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; | 105 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; |
96 | 106 |
97 return MOJO_RESULT_OK; | 107 return MOJO_RESULT_OK; |
98 } | 108 } |
99 | 109 |
100 void DataPipe::StartSerialize(bool have_channel_handle, | 110 DataPipe::DataPipe(const MojoCreateDataPipeOptions& options) |
101 bool have_shared_memory, | 111 : channel_(nullptr), |
102 size_t* max_size, | 112 options_(options), |
103 size_t* max_platform_handles) { | 113 channel_released_(false), |
114 shared_buffer_(nullptr), | |
115 mapping_(nullptr), | |
116 ring_buffer_start_(0), | |
117 ring_buffer_size_(0) {} | |
118 | |
119 DataPipe::~DataPipe() {} | |
120 | |
121 void DataPipe::Init(ScopedPlatformHandle message_pipe, | |
122 char* serialized_write_buffer, | |
123 size_t serialized_write_buffer_size, | |
124 char* serialized_read_buffer, | |
125 size_t serialized_read_buffer_size, | |
126 ScopedPlatformHandle shared_buffer_handle, | |
127 size_t ring_buffer_start, | |
128 size_t ring_buffer_size, | |
129 bool is_producer, | |
130 const base::Closure& init_callback) { | |
131 is_producer_ = is_producer; | |
132 | |
133 ring_buffer_start_ = ring_buffer_start; | |
134 ring_buffer_size_ = ring_buffer_size; | |
135 | |
136 if (shared_buffer_handle.is_valid()) { | |
137 shared_buffer_ = internal::g_platform_support->CreateSharedBufferFromHandle( | |
138 options_.capacity_num_bytes, std::move(shared_buffer_handle)); | |
139 DCHECK(shared_buffer_); | |
140 } else if (is_producer_) { | |
141 shared_buffer_ = internal::g_platform_support->CreateSharedBuffer( | |
142 options_.capacity_num_bytes); | |
143 } | |
144 | |
145 if (message_pipe.is_valid()) { | |
146 channel_ = RawChannel::Create(std::move(message_pipe)); | |
147 channel_->SetSerializedData(serialized_read_buffer, | |
148 serialized_read_buffer_size, | |
149 serialized_write_buffer, | |
150 serialized_write_buffer_size, nullptr, nullptr); | |
151 internal::g_io_thread_task_runner->PostTask(FROM_HERE, init_callback); | |
152 channel_->EnsureLazyInitialized(); | |
153 | |
154 if (is_producer_) { | |
155 if (shared_buffer_) { | |
156 internal::g_io_thread_task_runner->PostTask( | |
157 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
158 } else { | |
159 internal::g_io_thread_task_runner->PostTask( | |
160 FROM_HERE, base::Bind(&DataPipe::RequestSharedBuffer, this)); | |
161 } | |
162 } | |
163 } | |
164 } | |
165 | |
166 void DataPipe::StartSerialize(size_t* max_size, size_t* max_platform_handles) { | |
167 // We need to release the channel and get its pending reads/writes. | |
168 if (!channel_released_) { | |
169 Serialize(); | |
170 } | |
171 | |
104 *max_size = sizeof(SerializedDataPipeHandleDispatcher); | 172 *max_size = sizeof(SerializedDataPipeHandleDispatcher); |
105 *max_platform_handles = 0; | 173 *max_platform_handles = 0; |
106 if (have_channel_handle) | 174 if (serialized_channel_handle_.is_valid()) |
107 (*max_platform_handles)++; | 175 (*max_platform_handles)++; |
108 if (have_shared_memory) | 176 // For shared buffer to transfer pending reads / writes. |
177 if (!serialized_write_buffer_.empty() || !serialized_read_buffer_.empty()) | |
178 (*max_platform_handles)++; | |
179 if (shared_buffer_) | |
109 (*max_platform_handles)++; | 180 (*max_platform_handles)++; |
110 } | 181 } |
111 | 182 |
112 void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, | 183 void DataPipe::Serialize() { |
113 ScopedPlatformHandle channel_handle, | 184 // We need to stop watching handle immediately, even though not on IO thread, |
114 ScopedPlatformHandle shared_memory_handle, | 185 // so that other messages aren't read after this. |
115 size_t shared_memory_size, | 186 if (channel_) { |
116 void* destination, | 187 std::vector<int> fds; |
188 bool write_error = false; | |
189 serialized_channel_handle_ = channel_->ReleaseHandle( | |
190 &serialized_read_buffer_, &serialized_write_buffer_, &fds, &fds, | |
191 &write_error); | |
192 | |
193 CHECK(fds.empty()); | |
194 if (write_error) { | |
195 serialized_channel_handle_.reset(); | |
196 } | |
197 channel_ = nullptr; | |
198 } | |
199 channel_released_ = true; | |
200 } | |
201 | |
202 void DataPipe::EndSerialize(void* destination, | |
117 size_t* actual_size, | 203 size_t* actual_size, |
118 PlatformHandleVector* platform_handles) { | 204 PlatformHandleVector* platform_handles) { |
205 ScopedPlatformHandle channel_shared_handle; | |
206 size_t channel_shared_size = | |
207 serialized_read_buffer_.size() + serialized_write_buffer_.size(); | |
208 if (channel_shared_size) { | |
209 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
210 internal::g_platform_support->CreateSharedBuffer(channel_shared_size)); | |
211 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
212 shared_buffer->Map(0, channel_shared_size)); | |
213 | |
214 void* base = mapping->GetBase(); | |
215 if (!serialized_read_buffer_.empty()) { | |
216 memcpy(base, &serialized_read_buffer_[0], serialized_read_buffer_.size()); | |
217 } | |
218 if (!serialized_write_buffer_.empty()) { | |
219 memcpy(static_cast<uint8_t*>(base) + serialized_read_buffer_.size(), | |
220 &serialized_write_buffer_[0], serialized_write_buffer_.size()); | |
221 } | |
222 | |
223 channel_shared_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
224 } | |
225 | |
119 SerializedDataPipeHandleDispatcher* serialization = | 226 SerializedDataPipeHandleDispatcher* serialization = |
120 static_cast<SerializedDataPipeHandleDispatcher*>(destination); | 227 static_cast<SerializedDataPipeHandleDispatcher*>(destination); |
121 if (channel_handle.is_valid()) { | 228 if (serialized_channel_handle_.is_valid()) { |
122 serialization->platform_handle_index = platform_handles->size(); | 229 serialization->channel_handle_index = platform_handles->size(); |
123 platform_handles->push_back(channel_handle.release()); | 230 platform_handles->push_back(serialized_channel_handle_.release()); |
124 } else { | 231 } else { |
125 serialization->platform_handle_index = kInvalidDataPipeHandleIndex; | 232 serialization->channel_handle_index = kInvalidDataPipeHandleIndex; |
126 } | 233 } |
127 | 234 |
128 serialization->flags = options.flags; | 235 serialization->flags = options_.flags; |
129 serialization->element_num_bytes = options.element_num_bytes; | 236 serialization->element_num_bytes = options_.element_num_bytes; |
130 serialization->capacity_num_bytes = options.capacity_num_bytes; | 237 serialization->capacity_num_bytes = options_.capacity_num_bytes; |
131 | 238 |
132 serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); | 239 serialization->channel_pending_read_size = |
133 if (serialization->shared_memory_size) { | 240 static_cast<uint32_t>(serialized_read_buffer_.size()); |
134 serialization->shared_memory_handle_index = platform_handles->size(); | 241 serialization->channel_pending_write_size = |
135 platform_handles->push_back(shared_memory_handle.release()); | 242 static_cast<uint32_t>(serialized_write_buffer_.size()); |
243 | |
244 if (channel_shared_handle.is_valid()) { | |
245 serialization->channel_shared_handle_index = platform_handles->size(); | |
246 platform_handles->push_back(channel_shared_handle.release()); | |
136 } else { | 247 } else { |
137 serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; | 248 serialization->channel_shared_handle_index = kInvalidDataPipeHandleIndex; |
138 } | 249 } |
139 | 250 |
251 ScopedPlatformHandle shared_buffer_handle; | |
252 if (shared_buffer_) { | |
253 shared_buffer_handle.reset(shared_buffer_->PassPlatformHandle().release()); | |
254 } | |
255 | |
256 if (shared_buffer_handle.is_valid()) { | |
257 serialization->shared_buffer_handle_index = platform_handles->size(); | |
258 platform_handles->push_back(shared_buffer_handle.release()); | |
259 } else { | |
260 serialization->shared_buffer_handle_index = kInvalidDataPipeHandleIndex; | |
261 } | |
262 | |
263 serialization->ring_buffer_start = ring_buffer_start_; | |
264 serialization->ring_buffer_size = ring_buffer_size_; | |
265 | |
140 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); | 266 *actual_size = sizeof(SerializedDataPipeHandleDispatcher); |
141 } | 267 } |
142 | 268 |
143 ScopedPlatformHandle DataPipe::Deserialize( | 269 ScopedPlatformHandle DataPipe::Deserialize( |
144 const void* source, | 270 const void* source, |
145 size_t size, | 271 size_t size, |
146 PlatformHandleVector* platform_handles, | 272 PlatformHandleVector* platform_handles, |
147 MojoCreateDataPipeOptions* options, | 273 MojoCreateDataPipeOptions* options, |
148 ScopedPlatformHandle* shared_memory_handle, | 274 ScopedPlatformHandle* channel_shared_handle, |
149 size_t* shared_memory_size) { | 275 size_t* serialized_read_buffer_size, |
276 size_t* serialized_write_buffer_size, | |
277 ScopedPlatformHandle* shared_buffer_handle, | |
278 size_t* ring_buffer_start, | |
279 size_t* ring_buffer_size) { | |
150 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { | 280 if (size != sizeof(SerializedDataPipeHandleDispatcher)) { |
151 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; | 281 LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; |
152 return ScopedPlatformHandle(); | 282 return ScopedPlatformHandle(); |
153 } | 283 } |
154 | 284 |
155 const SerializedDataPipeHandleDispatcher* serialization = | 285 const SerializedDataPipeHandleDispatcher* serialization = |
156 static_cast<const SerializedDataPipeHandleDispatcher*>(source); | 286 static_cast<const SerializedDataPipeHandleDispatcher*>(source); |
157 size_t platform_handle_index = serialization->platform_handle_index; | |
158 | 287 |
159 // Starts off invalid, which is what we want. | 288 // Starts off invalid, which is what we want. |
160 PlatformHandle platform_handle; | 289 PlatformHandle channel_handle; |
161 if (platform_handle_index != kInvalidDataPipeHandleIndex) { | 290 size_t channel_handle_index = serialization->channel_handle_index; |
162 if (!platform_handles || | 291 if (channel_handle_index != kInvalidDataPipeHandleIndex) { |
163 platform_handle_index >= platform_handles->size()) { | 292 if (!platform_handles || channel_handle_index >= platform_handles->size()) { |
164 LOG(ERROR) | 293 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
165 << "Invalid serialized data pipe dispatcher (missing handles)"; | |
166 return ScopedPlatformHandle(); | 294 return ScopedPlatformHandle(); |
167 } | 295 } |
168 | 296 |
169 // We take ownership of the handle, so we have to invalidate the one in | 297 // We take ownership of the handle, so we have to invalidate the one in |
170 // |platform_handles|. | 298 // |platform_handles|. |
171 std::swap(platform_handle, (*platform_handles)[platform_handle_index]); | 299 std::swap(channel_handle, (*platform_handles)[channel_handle_index]); |
172 } | 300 } |
173 | 301 |
174 options->struct_size = sizeof(MojoCreateDataPipeOptions); | 302 options->struct_size = sizeof(MojoCreateDataPipeOptions); |
175 options->flags = serialization->flags; | 303 options->flags = serialization->flags; |
176 options->element_num_bytes = serialization->element_num_bytes; | 304 options->element_num_bytes = serialization->element_num_bytes; |
177 options->capacity_num_bytes = serialization->capacity_num_bytes; | 305 options->capacity_num_bytes = serialization->capacity_num_bytes; |
178 | 306 *serialized_read_buffer_size = serialization->channel_pending_read_size; |
179 if (shared_memory_size) { | 307 *serialized_write_buffer_size = serialization->channel_pending_write_size; |
180 *shared_memory_size = serialization->shared_memory_size; | 308 *ring_buffer_start = serialization->ring_buffer_start; |
181 if (*shared_memory_size) { | 309 *ring_buffer_size = serialization->ring_buffer_size; |
182 DCHECK(serialization->shared_memory_handle_index != | 310 |
183 kInvalidDataPipeHandleIndex); | 311 size_t channel_shared_handle_index = |
184 if (!platform_handles || | 312 serialization->channel_shared_handle_index; |
185 serialization->shared_memory_handle_index >= | 313 if (*serialized_read_buffer_size || *serialized_write_buffer_size) { |
186 platform_handles->size()) { | 314 DCHECK_NE(channel_shared_handle_index, kInvalidDataPipeHandleIndex); |
187 LOG(ERROR) << "Invalid serialized data pipe dispatcher " | 315 if (!platform_handles || |
188 << "(missing handles)"; | 316 channel_shared_handle_index >= platform_handles->size()) { |
189 return ScopedPlatformHandle(); | 317 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; |
318 return ScopedPlatformHandle(); | |
319 } | |
320 | |
321 // Take ownership. | |
322 PlatformHandle temp_channel_shared_handle; | |
323 std::swap(temp_channel_shared_handle, | |
324 (*platform_handles)[channel_shared_handle_index]); | |
325 *channel_shared_handle = ScopedPlatformHandle(temp_channel_shared_handle); | |
326 } | |
327 | |
328 size_t shared_buffer_handle_index = serialization->shared_buffer_handle_index; | |
329 if (shared_buffer_handle_index != kInvalidDataPipeHandleIndex) { | |
330 if (!platform_handles || | |
331 shared_buffer_handle_index >= platform_handles->size()) { | |
332 LOG(ERROR) << "Invalid serialized data pipe dispatcher (missing handles)"; | |
333 return ScopedPlatformHandle(); | |
334 } | |
335 | |
336 // Take ownership. | |
337 PlatformHandle temp_shared_buffer_handle; | |
338 std::swap(temp_shared_buffer_handle, | |
339 (*platform_handles)[shared_buffer_handle_index]); | |
340 *shared_buffer_handle = ScopedPlatformHandle(temp_shared_buffer_handle); | |
341 } | |
342 | |
343 return ScopedPlatformHandle(channel_handle); | |
344 } | |
345 | |
346 void* DataPipe::GetSharedBufferBase() { | |
347 if (!mapping_) { | |
348 DCHECK(shared_buffer_); | |
349 mapping_ = shared_buffer_->Map(0, options_.capacity_num_bytes); | |
350 } | |
351 if (!mapping_) { | |
352 return nullptr; | |
353 } | |
354 return mapping_->GetBase(); | |
355 } | |
356 | |
357 bool DataPipe::WriteDataIntoSharedBuffer(const void* elements, | |
358 size_t num_bytes) { | |
359 size_t bufsz = options_.capacity_num_bytes; | |
Anand Mistry (off Chromium)
2016/01/08 03:28:02
This isn't Go. Don't abbreviate unnecessarily.
Eliot Courtney
2016/01/08 04:44:39
Done.
| |
360 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); | |
361 if (base == nullptr) { | |
Anand Mistry (off Chromium)
2016/01/08 03:28:02
generally prefer:
if (!base)
Eliot Courtney
2016/01/08 04:44:39
Done.
| |
362 return false; | |
363 } | |
364 | |
365 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
366 DCHECK_LE(num_bytes, bufsz - ring_buffer_size_); | |
367 | |
368 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
369 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
370 size_t bytes_until_end = bufsz - ring_buffer_end; | |
371 memcpy(base + ring_buffer_end, elements, | |
372 std::min(num_bytes, bytes_until_end)); | |
373 | |
374 if (bytes_until_end < num_bytes) { | |
375 memcpy(base, static_cast<const uint8_t*>(elements) + bytes_until_end, | |
376 num_bytes - bytes_until_end); | |
377 } | |
378 } else { | |
379 memcpy(base + ring_buffer_end, elements, num_bytes); | |
380 } | |
381 | |
382 return true; | |
383 } | |
384 | |
385 bool DataPipe::ReadDataFromSharedBuffer(void* data, size_t num_bytes) { | |
386 DCHECK_LE(num_bytes, ring_buffer_size_); | |
387 DCHECK_EQ(num_bytes % options_.element_num_bytes, 0u); | |
388 | |
389 size_t bufsz = options_.capacity_num_bytes; | |
390 uint8_t* base = static_cast<uint8_t*>(GetSharedBufferBase()); | |
391 if (base == nullptr) { | |
392 return false; | |
393 } | |
394 | |
395 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
396 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
397 memcpy(data, base + ring_buffer_start_, num_bytes); | |
398 } else { | |
399 size_t bytes_until_end = bufsz - ring_buffer_start_; | |
400 memcpy(data, base + ring_buffer_start_, | |
401 std::min(num_bytes, bytes_until_end)); | |
402 | |
403 if (bytes_until_end < num_bytes) { | |
404 memcpy(static_cast<uint8_t*>(data) + bytes_until_end, base, | |
405 num_bytes - bytes_until_end); | |
406 } | |
407 } | |
408 | |
409 return true; | |
410 } | |
411 | |
412 size_t DataPipe::GetReadableBytes() const { | |
413 return shared_buffer_ ? ring_buffer_size_ : 0; | |
414 } | |
415 | |
416 size_t DataPipe::GetWritableBytes() const { | |
417 return shared_buffer_ ? options_.capacity_num_bytes - ring_buffer_size_ : 0; | |
418 } | |
419 | |
420 void DataPipe::UpdateFromRead(size_t num_bytes) { | |
421 ring_buffer_start_ = | |
422 (ring_buffer_start_ + num_bytes) % options_.capacity_num_bytes; | |
423 ring_buffer_size_ -= num_bytes; | |
424 DCHECK_GE(ring_buffer_size_, 0u); | |
425 } | |
426 | |
427 void DataPipe::UpdateFromWrite(size_t num_bytes) { | |
428 ring_buffer_size_ += num_bytes; | |
429 DCHECK_LE(ring_buffer_size_, options_.capacity_num_bytes); | |
430 } | |
431 | |
432 bool DataPipe::NotifyWrite(size_t num_bytes) { | |
433 DCHECK(is_producer_); | |
434 | |
435 if (!channel_) | |
436 return false; | |
437 | |
438 DataPipeCommandHeader command = {DATA_WRITTEN, num_bytes}; | |
439 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
440 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
441 if (!channel_->WriteMessage(std::move(message))) { | |
442 return false; | |
443 } | |
444 return true; | |
445 } | |
446 | |
447 bool DataPipe::NotifyRead(size_t num_bytes) { | |
448 DCHECK(!is_producer_); | |
449 | |
450 if (!channel_) | |
451 return false; | |
452 DataPipeCommandHeader command = {DATA_READ, num_bytes}; | |
453 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
454 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
455 if (!channel_->WriteMessage(std::move(message))) { | |
456 return false; | |
457 } | |
458 return true; | |
459 } | |
460 | |
461 void DataPipe::NotifySharedBuffer() { | |
462 if (!channel_) { | |
463 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
464 return; | |
465 } | |
466 DataPipeCommandHeader command = {NOTIFY_SHARED_BUFFER, | |
467 options_.capacity_num_bytes}; | |
468 ScopedPlatformHandle handle = shared_buffer_->DuplicatePlatformHandle(); | |
469 DCHECK(handle.is_valid()); | |
470 ScopedPlatformHandleVectorPtr fds(new PlatformHandleVector()); | |
471 fds->push_back(handle.release()); | |
472 | |
473 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
474 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
475 message->SetTransportData(make_scoped_ptr(new TransportData( | |
476 std::move(fds), channel_->GetSerializedPlatformHandleSize()))); | |
477 if (!channel_->WriteMessage(std::move(message))) { | |
478 DLOG(ERROR) << "NotifySharedBuffer: Can't write"; | |
479 } | |
480 } | |
481 | |
482 void DataPipe::RequestSharedBuffer() { | |
483 if (!channel_) { | |
484 DLOG(ERROR) << "NotifySharedBuffer: No channel"; | |
485 return; | |
486 } | |
487 | |
488 DataPipeCommandHeader command = {PLEASE_CREATE_SHARED_BUFFER, | |
489 options_.capacity_num_bytes}; | |
490 | |
491 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
492 MessageInTransit::Type::MESSAGE, sizeof(command), &command)); | |
493 if (!channel_->WriteMessage(std::move(message))) { | |
494 DLOG(ERROR) << "RequestSharedBuffer: Can't write"; | |
495 } | |
496 } | |
497 | |
498 void DataPipe::UpdateSharedBuffer( | |
499 scoped_refptr<PlatformSharedBuffer> shared_buffer) { | |
500 shared_buffer_ = shared_buffer; | |
501 mapping_ = nullptr; | |
502 } | |
503 | |
504 void DataPipe::Shutdown() { | |
505 if (channel_) { | |
506 channel_->Shutdown(); | |
507 channel_ = nullptr; | |
508 } | |
509 } | |
510 | |
511 void DataPipe::CreateEquivalentAndClose(DataPipe* out) { | |
512 // Serialize, releasing the handle, otherwise we will get callbacks to the | |
513 // wrong delegate for RawChannel. | |
514 Serialize(); | |
515 | |
516 std::swap(options_, out->options_); | |
517 std::swap(channel_released_, out->channel_released_); | |
518 serialized_read_buffer_.swap(out->serialized_read_buffer_); | |
519 serialized_write_buffer_.swap(out->serialized_write_buffer_); | |
520 serialized_channel_handle_.swap(out->serialized_channel_handle_); | |
521 shared_buffer_.swap(out->shared_buffer_); | |
522 mapping_.swap(out->mapping_); | |
523 std::swap(ring_buffer_start_, out->ring_buffer_start_); | |
524 std::swap(ring_buffer_size_, out->ring_buffer_size_); | |
525 std::swap(is_producer_, out->is_producer_); | |
526 } | |
527 | |
528 void* DataPipe::GetWriteBuffer(size_t* num_bytes) { | |
529 if (!shared_buffer_) { | |
530 *num_bytes = 0; | |
531 return nullptr; | |
532 } | |
533 // Must provide sequential memory. | |
534 size_t bufsz = options_.capacity_num_bytes; | |
535 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
536 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
537 *num_bytes = bufsz - ring_buffer_end; | |
538 } else { | |
539 *num_bytes = ring_buffer_start_ - ring_buffer_end; | |
540 } | |
541 | |
542 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_end; | |
543 } | |
544 | |
545 const void* DataPipe::GetReadBuffer(size_t* num_bytes) { | |
546 if (!shared_buffer_) { | |
547 *num_bytes = 0; | |
548 return nullptr; | |
549 } | |
550 // Must provide sequential memory. | |
551 size_t bufsz = options_.capacity_num_bytes; | |
552 size_t ring_buffer_end = (ring_buffer_start_ + ring_buffer_size_) % bufsz; | |
553 if (ring_buffer_size_ == 0 || ring_buffer_start_ < ring_buffer_end) { | |
554 *num_bytes = ring_buffer_size_; | |
555 } else { | |
556 *num_bytes = bufsz - ring_buffer_start_; | |
557 } | |
558 | |
559 return static_cast<uint8_t*>(GetSharedBufferBase()) + ring_buffer_start_; | |
560 } | |
561 | |
562 bool DataPipe::ProcessCommand(const DataPipeCommandHeader& command, | |
563 ScopedPlatformHandleVectorPtr platform_handles) { | |
564 PlatformHandle handle; | |
565 ScopedPlatformHandle scoped_handle; | |
566 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
567 bool was_unreadable = GetReadableBytes() == 0; | |
568 bool was_unwritable = GetWritableBytes() == 0; | |
569 switch (command.command) { | |
570 case PLEASE_CREATE_SHARED_BUFFER: | |
571 // Other end wasn't able to create the shared buffer. | |
572 shared_buffer = | |
573 internal::g_platform_support->CreateSharedBuffer(command.num_bytes); | |
574 CHECK(shared_buffer); | |
575 UpdateSharedBuffer(shared_buffer); | |
576 | |
577 // Can't call back into RawChannel because of locking, so do this later. | |
578 internal::g_io_thread_task_runner->PostTask( | |
579 FROM_HERE, base::Bind(&DataPipe::NotifySharedBuffer, this)); | |
580 break; | |
581 case NOTIFY_SHARED_BUFFER: | |
582 CHECK_EQ(platform_handles->size(), 1u); | |
583 std::swap(handle, (*platform_handles.get())[0]); | |
584 scoped_handle.reset(handle); | |
585 UpdateSharedBuffer( | |
586 internal::g_platform_support->CreateSharedBufferFromHandle( | |
587 command.num_bytes, std::move(scoped_handle))); | |
588 break; | |
589 case DATA_WRITTEN: | |
590 if (!is_producer_) { | |
591 UpdateFromWrite(command.num_bytes); | |
592 } else { | |
593 LOG(ERROR) << "Producer was told of a write, which shouldn't happen."; | |
594 DCHECK(0); | |
190 } | 595 } |
191 | 596 break; |
192 PlatformHandle temp_shared_memory_handle; | 597 case DATA_READ: |
193 std::swap(temp_shared_memory_handle, | 598 if (is_producer_) { |
194 (*platform_handles)[serialization->shared_memory_handle_index]); | 599 UpdateFromRead(command.num_bytes); |
195 *shared_memory_handle = | 600 } else { |
196 ScopedPlatformHandle(temp_shared_memory_handle); | 601 LOG(ERROR) << "Consumer was told of a read, which shouldn't happen."; |
197 } | 602 DCHECK(0); |
198 } | 603 } |
199 | 604 break; |
200 size -= sizeof(SerializedDataPipeHandleDispatcher); | 605 default: |
201 | 606 LOG(ERROR) << "Shouldn't happen"; |
202 return ScopedPlatformHandle(platform_handle); | 607 DCHECK(0); |
608 } | |
609 | |
610 // Handles write/read case and shared buffer becoming available case. | |
611 return (was_unreadable && GetReadableBytes()) || | |
612 (was_unwritable && GetWritableBytes()); | |
203 } | 613 } |
204 | 614 |
205 } // namespace edk | 615 } // namespace edk |
206 } // namespace mojo | 616 } // namespace mojo |
OLD | NEW |