Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm>
10 #include <utility> 11 #include <utility>
11 12
12 #include "base/bind.h" 13 #include "base/bind.h"
13 #include "base/logging.h" 14 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h" 16 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_shared_buffer.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h"
17 #include "mojo/edk/embedder/platform_support.h" 18 #include "mojo/edk/embedder/platform_support.h"
18 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/data_pipe.h" 20 #include "mojo/edk/system/transport_data.h"
20 21
21 namespace mojo { 22 namespace mojo {
22 namespace edk { 23 namespace edk {
23 24
24 void DataPipeProducerDispatcher::Init( 25 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe,
25 ScopedPlatformHandle message_pipe, 26 char* serialized_write_buffer,
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { 27 size_t serialized_write_buffer_size,
27 if (message_pipe.is_valid()) { 28 char* serialized_read_buffer,
28 channel_ = RawChannel::Create(std::move(message_pipe)); 29 size_t serialized_read_buffer_size,
29 channel_->SetSerializedData( 30 ScopedPlatformHandle shared_buffer_handle,
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, 31 size_t ring_buffer_start,
31 nullptr, nullptr); 32 size_t ring_buffer_size) {
32 internal::g_io_thread_task_runner->PostTask( 33 if (!message_pipe.is_valid()) {
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); 34 peer_closed_ = true;
34 } else {
35 error_ = true;
36 } 35 }
36
37 data_pipe_->Init(std::move(message_pipe), serialized_write_buffer,
38 serialized_write_buffer_size, serialized_read_buffer,
39 serialized_read_buffer_size, std::move(shared_buffer_handle),
40 ring_buffer_start, ring_buffer_size, true,
41 base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
37 } 42 }
38 43
39 void DataPipeProducerDispatcher::InitOnIO() { 44 void DataPipeProducerDispatcher::InitOnIO() {
40 base::AutoLock locker(lock()); 45 base::AutoLock locker(lock());
41 if (channel_) 46 calling_init_ = true;
42 channel_->Init(this); 47 RawChannel* channel = data_pipe_->GetChannel();
48 if (channel)
49 channel->Init(this);
50 calling_init_ = false;
43 } 51 }
44 52
45 void DataPipeProducerDispatcher::CloseOnIO() { 53 void DataPipeProducerDispatcher::CloseOnIO() {
46 base::AutoLock locker(lock()); 54 base::AutoLock locker(lock());
47 if (channel_) { 55 data_pipe_->Shutdown();
48 channel_->Shutdown();
49 channel_ = nullptr;
50 }
51 } 56 }
52 57
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 58 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
54 return Type::DATA_PIPE_PRODUCER; 59 return Type::DATA_PIPE_PRODUCER;
55 } 60 }
56 61
57 scoped_refptr<DataPipeProducerDispatcher> 62 scoped_refptr<DataPipeProducerDispatcher>
58 DataPipeProducerDispatcher::Deserialize( 63 DataPipeProducerDispatcher::Deserialize(
59 const void* source, 64 const void* source,
60 size_t size, 65 size_t size,
61 PlatformHandleVector* platform_handles) { 66 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options; 67 MojoCreateDataPipeOptions options;
63 ScopedPlatformHandle shared_memory_handle; 68 ScopedPlatformHandle channel_handle, channel_shared_handle,
64 size_t shared_memory_size = 0; 69 shared_buffer_handle;
65 ScopedPlatformHandle platform_handle = 70 size_t serialized_read_buffer_size, serialized_write_buffer_size;
66 DataPipe::Deserialize(source, size, platform_handles, &options, 71 size_t ring_buffer_start, ring_buffer_size;
67 &shared_memory_handle, &shared_memory_size); 72
73 ScopedPlatformHandle platform_handle = DataPipe::Deserialize(
74 source, size, platform_handles, &options, &channel_shared_handle,
75 &serialized_read_buffer_size, &serialized_write_buffer_size,
76 &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size);
68 77
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); 78 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
70 79
80 // Create shared buffer and pull out the serialised read and write data
81 // for the channel.
82 size_t sz = serialized_write_buffer_size + serialized_read_buffer_size;
83 char* serialized_read_buffer = nullptr;
71 char* serialized_write_buffer = nullptr; 84 char* serialized_write_buffer = nullptr;
72 size_t serialized_write_buffer_size = 0; 85 scoped_refptr<PlatformSharedBuffer> channel_shared_buffer;
73 scoped_refptr<PlatformSharedBuffer> shared_buffer;
74 scoped_ptr<PlatformSharedBufferMapping> mapping; 86 scoped_ptr<PlatformSharedBufferMapping> mapping;
75 if (shared_memory_size) { 87 if (channel_shared_handle.is_valid()) {
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( 88 channel_shared_buffer =
77 shared_memory_size, std::move(shared_memory_handle)); 89 internal::g_platform_support->CreateSharedBufferFromHandle(
78 mapping = shared_buffer->Map(0, shared_memory_size); 90 sz, std::move(channel_shared_handle));
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); 91 mapping = channel_shared_buffer->Map(0, sz);
80 serialized_write_buffer_size = shared_memory_size; 92
93 serialized_read_buffer = static_cast<char*>(mapping->GetBase());
94 serialized_write_buffer =
95 static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size;
81 } 96 }
82 97
83 rv->Init(std::move(platform_handle), serialized_write_buffer, 98 rv->Init(std::move(platform_handle), serialized_read_buffer,
84 serialized_write_buffer_size); 99 serialized_read_buffer_size, serialized_write_buffer,
100 serialized_write_buffer_size, std::move(shared_buffer_handle),
101 ring_buffer_start, ring_buffer_size);
85 return rv; 102 return rv;
86 } 103 }
87 104
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 105 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
89 const MojoCreateDataPipeOptions& options) 106 const MojoCreateDataPipeOptions& options)
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { 107 : data_pipe_(new DataPipe(options)),
91 } 108 calling_init_(false),
109 peer_closed_(false),
110 in_two_phase_write_(false),
111 two_phase_max_bytes_write_(0) {}
92 112
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 113 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
94 // See comment in ~MessagePipeDispatcher. 114 // See comment in ~MessagePipeDispatcher.
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 115 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
96 channel_->Shutdown(); 116 data_pipe_->Shutdown();
97 else 117 else
98 DCHECK(!channel_); 118 DCHECK(!data_pipe_->GetChannel());
99 } 119 }
100 120
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { 121 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
102 lock().AssertAcquired(); 122 lock().AssertAcquired();
103 awakable_list_.CancelAll(); 123 awakable_list_.CancelAll();
104 } 124 }
105 125
106 void DataPipeProducerDispatcher::CloseImplNoLock() { 126 void DataPipeProducerDispatcher::CloseImplNoLock() {
107 lock().AssertAcquired(); 127 lock().AssertAcquired();
108 internal::g_io_thread_task_runner->PostTask( 128 internal::g_io_thread_task_runner->PostTask(
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); 129 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
110 } 130 }
111 131
112 scoped_refptr<Dispatcher> 132 scoped_refptr<Dispatcher>
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 133 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
134 // This function is used by TransportData to make sure there are no references
135 // to the dispatcher it is trying to serialise and transport.
114 lock().AssertAcquired(); 136 lock().AssertAcquired();
115 137
116 SerializeInternal(); 138 scoped_refptr<DataPipeProducerDispatcher> rv =
139 Create(data_pipe_->GetOptions());
140 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
117 141
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); 142 DCHECK(!in_two_phase_write_);
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); 143
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
121 rv->serialized_ = true;
122 return scoped_refptr<Dispatcher>(rv.get()); 144 return scoped_refptr<Dispatcher>(rv.get());
123 } 145 }
124 146
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( 147 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
126 const void* elements, 148 const void* elements,
127 uint32_t* num_bytes, 149 uint32_t* num_bytes,
128 MojoWriteDataFlags flags) { 150 MojoWriteDataFlags flags) {
129 lock().AssertAcquired(); 151 lock().AssertAcquired();
130 if (InTwoPhaseWrite()) 152 if (in_two_phase_write_)
131 return MOJO_RESULT_BUSY; 153 return MOJO_RESULT_BUSY;
132 if (error_) 154 if (peer_closed_)
133 return MOJO_RESULT_FAILED_PRECONDITION; 155 return MOJO_RESULT_FAILED_PRECONDITION;
134 if (*num_bytes % options_.element_num_bytes != 0) 156 if (*num_bytes % data_pipe_->GetOptions().element_num_bytes != 0)
135 return MOJO_RESULT_INVALID_ARGUMENT; 157 return MOJO_RESULT_INVALID_ARGUMENT;
136 if (*num_bytes == 0) 158 if (*num_bytes == 0)
137 return MOJO_RESULT_OK; // Nothing to do. 159 return MOJO_RESULT_OK; // Nothing to do.
138 160
139 // For now, we ignore options.capacity_num_bytes as a total of all pending 161 // Don't write non element sized chunks.
140 // writes (and just treat it per message). We will implement that later if 162 uint32_t writable = uint32_t(data_pipe_->GetWritableBytes());
141 // we need to. All current uses want all their data to be sent, and it's not 163 writable -= writable % data_pipe_->GetOptions().element_num_bytes;
142 // clear that this backpressure should be done at the mojo layer or at a 164
143 // higher application layer.
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 165 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; 166 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { 167 if (min_num_bytes_to_write > writable) {
147 // Don't return "should wait" since you can't wait for a specified amount of 168 // Don't return "should wait" since you can't wait for a specified amount of
148 // data. 169 // data.
149 return MOJO_RESULT_OUT_OF_RANGE; 170 return MOJO_RESULT_OUT_OF_RANGE;
150 } 171 }
151 172
152 uint32_t num_bytes_to_write = 173 if (writable == 0)
153 std::min(*num_bytes, options_.capacity_num_bytes);
154 if (num_bytes_to_write == 0)
155 return MOJO_RESULT_SHOULD_WAIT; 174 return MOJO_RESULT_SHOULD_WAIT;
156 175
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 176 uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
177
178 // The failure case for |WriteDataIntoSharedBuffer| is the shared
179 // buffer not existing, so we should wait.
180 if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
181 return MOJO_RESULT_SHOULD_WAIT;
182 }
183
184 // If we can't tell the other end about the write, pretend this write didn't
185 // happen and mark the other end as closed. We deal with any state changes
186 // due to the other side being closed in OnError.
187 if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
188 peer_closed_ = true;
189 return MOJO_RESULT_FAILED_PRECONDITION;
190 }
158 191
159 *num_bytes = num_bytes_to_write; 192 *num_bytes = num_bytes_to_write;
160 WriteDataIntoMessages(elements, num_bytes_to_write);
161 193
194 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
195 data_pipe_->UpdateFromWrite(num_bytes_to_write);
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 196 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
163 if (!new_state.equals(old_state)) 197 if (!new_state.equals(old_state))
164 awakable_list_.AwakeForStateChange(new_state); 198 awakable_list_.AwakeForStateChange(new_state);
199
165 return MOJO_RESULT_OK; 200 return MOJO_RESULT_OK;
166 } 201 }
167 202
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( 203 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
169 void** buffer, 204 void** buffer,
170 uint32_t* buffer_num_bytes, 205 uint32_t* buffer_num_bytes,
171 MojoWriteDataFlags flags) { 206 MojoWriteDataFlags flags) {
172 lock().AssertAcquired(); 207 lock().AssertAcquired();
173 if (InTwoPhaseWrite()) 208 if (in_two_phase_write_)
174 return MOJO_RESULT_BUSY; 209 return MOJO_RESULT_BUSY;
175 if (error_) 210 if (peer_closed_)
176 return MOJO_RESULT_FAILED_PRECONDITION; 211 return MOJO_RESULT_FAILED_PRECONDITION;
177 212
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. 213 size_t max_num_bytes_to_write;
214 void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
215
216 if (max_num_bytes_to_write == 0)
217 return MOJO_RESULT_SHOULD_WAIT;
218
179 if (*buffer_num_bytes == 0) 219 if (*buffer_num_bytes == 0)
180 *buffer_num_bytes = options_.capacity_num_bytes; 220 *buffer_num_bytes = uint32_t(max_num_bytes_to_write);
181 221
182 two_phase_data_.resize(*buffer_num_bytes); 222 // Don't promise more bytes than we have.
183 *buffer = &two_phase_data_[0]; 223 *buffer_num_bytes =
224 std::min(uint32_t(max_num_bytes_to_write), *buffer_num_bytes);
184 225
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes 226 two_phase_max_bytes_write_ = *buffer_num_bytes;
186 // we can construct a MessageInTransit here. But then we need to make 227 *buffer = temp_buf;
187 // MessageInTransit support changing its data size later. 228 in_two_phase_write_ = true;
188 229
189 return MOJO_RESULT_OK; 230 return MOJO_RESULT_OK;
190 } 231 }
191 232
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( 233 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
193 uint32_t num_bytes_written) { 234 uint32_t num_bytes_written) {
194 lock().AssertAcquired(); 235 lock().AssertAcquired();
195 if (!InTwoPhaseWrite()) 236 if (!in_two_phase_write_)
196 return MOJO_RESULT_FAILED_PRECONDITION; 237 return MOJO_RESULT_FAILED_PRECONDITION;
197 238
239 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
240 in_two_phase_write_ = false;
241
242 if (num_bytes_written > two_phase_max_bytes_write_ ||
243 num_bytes_written % data_pipe_->GetOptions().element_num_bytes != 0) {
244 return MOJO_RESULT_INVALID_ARGUMENT;
245 }
246
247 data_pipe_->UpdateFromWrite(num_bytes_written);
248
249 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
250 if (!new_state.equals(old_state))
251 awakable_list_.AwakeForStateChange(new_state);
252
198 // Note: Allow successful completion of the two-phase write even if the other 253 // Note: Allow successful completion of the two-phase write even if the other
199 // side has been closed. 254 // side has been closed.
200 MojoResult rv = MOJO_RESULT_OK; 255 // Deal with state changes due to peer being closed in OnError.
201 if (num_bytes_written > two_phase_data_.size() || 256 if (!data_pipe_->NotifyWrite(num_bytes_written))
202 num_bytes_written % options_.element_num_bytes != 0) { 257 peer_closed_ = true;
203 rv = MOJO_RESULT_INVALID_ARGUMENT;
204 } else if (channel_) {
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
206 }
207 258
208 // Two-phase write ended even on failure. 259 return MOJO_RESULT_OK;
209 two_phase_data_.clear();
210 // If we're now writable, we *became* writable (since we weren't writable
211 // during the two-phase write), so awake producer awakables.
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
214 awakable_list_.AwakeForStateChange(new_state);
215
216 return rv;
217 } 260 }
218 261
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() 262 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
220 const { 263 const {
221 lock().AssertAcquired(); 264 lock().AssertAcquired();
222 265
223 HandleSignalsState rv; 266 HandleSignalsState rv;
224 if (!error_) { 267 if (!peer_closed_) {
225 if (!InTwoPhaseWrite()) 268 if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 269 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 270 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
228 } else { 271 } else {
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 272 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
230 } 273 }
274
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
232 return rv; 276 return rv;
233 } 277 }
234 278
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( 279 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
236 Awakable* awakable, 280 Awakable* awakable,
237 MojoHandleSignals signals, 281 MojoHandleSignals signals,
238 uintptr_t context, 282 uintptr_t context,
239 HandleSignalsState* signals_state) { 283 HandleSignalsState* signals_state) {
240 lock().AssertAcquired(); 284 lock().AssertAcquired();
241 if (channel_)
242 channel_->EnsureLazyInitialized();
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 285 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
244 if (state.satisfies(signals)) { 286 if (state.satisfies(signals)) {
245 if (signals_state) 287 if (signals_state)
246 *signals_state = state; 288 *signals_state = state;
247 return MOJO_RESULT_ALREADY_EXISTS; 289 return MOJO_RESULT_ALREADY_EXISTS;
248 } 290 }
249 if (!state.can_satisfy(signals)) { 291 if (!state.can_satisfy(signals)) {
250 if (signals_state) 292 if (signals_state)
251 *signals_state = state; 293 *signals_state = state;
252 return MOJO_RESULT_FAILED_PRECONDITION; 294 return MOJO_RESULT_FAILED_PRECONDITION;
253 } 295 }
254 296
255 awakable_list_.Add(awakable, signals, context); 297 awakable_list_.Add(awakable, signals, context);
256 return MOJO_RESULT_OK; 298 return MOJO_RESULT_OK;
257 } 299 }
258 300
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( 301 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
260 Awakable* awakable, 302 Awakable* awakable,
261 HandleSignalsState* signals_state) { 303 HandleSignalsState* signals_state) {
262 lock().AssertAcquired(); 304 lock().AssertAcquired();
263 awakable_list_.Remove(awakable); 305 awakable_list_.Remove(awakable);
264 if (signals_state) 306 if (signals_state)
265 *signals_state = GetHandleSignalsStateImplNoLock(); 307 *signals_state = GetHandleSignalsStateImplNoLock();
266 } 308 }
267 309
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( 310 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
269 size_t* max_size, 311 size_t* max_size,
270 size_t* max_platform_handles) { 312 size_t* max_platform_handles) {
271 if (!serialized_) 313 data_pipe_->StartSerialize(max_size, max_platform_handles);
272 SerializeInternal();
273
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
275 !serialized_write_buffer_.empty(), max_size,
276 max_platform_handles);
277 } 314 }
278 315
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( 316 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
280 void* destination, 317 void* destination,
281 size_t* actual_size, 318 size_t* actual_size,
282 PlatformHandleVector* platform_handles) { 319 PlatformHandleVector* platform_handles) {
283 ScopedPlatformHandle shared_memory_handle; 320 data_pipe_->EndSerialize(destination, actual_size, platform_handles);
284 size_t shared_memory_size = serialized_write_buffer_.size();
285 if (shared_memory_size) {
286 scoped_refptr<PlatformSharedBuffer> shared_buffer(
287 internal::g_platform_support->CreateSharedBuffer(
288 shared_memory_size));
289 scoped_ptr<PlatformSharedBufferMapping> mapping(
290 shared_buffer->Map(0, shared_memory_size));
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
292 shared_memory_size);
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
294 }
295
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
297 std::move(shared_memory_handle), shared_memory_size,
298 destination, actual_size, platform_handles);
299 CloseImplNoLock(); 321 CloseImplNoLock();
300 return true; 322 return true;
301 } 323 }
302 324
303 void DataPipeProducerDispatcher::TransportStarted() { 325 void DataPipeProducerDispatcher::TransportStarted() {
304 started_transport_.Acquire(); 326 started_transport_.Acquire();
305 } 327 }
306 328
307 void DataPipeProducerDispatcher::TransportEnded() { 329 void DataPipeProducerDispatcher::TransportEnded() {
308 started_transport_.Release(); 330 started_transport_.Release();
309 } 331 }
310 332
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { 333 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
312 lock().AssertAcquired(); 334 lock().AssertAcquired();
313 return InTwoPhaseWrite(); 335 return in_two_phase_write_;
336 }
337
338 bool DataPipeProducerDispatcher::ProcessCommand(
339 const DataPipeCommandHeader& command,
340 ScopedPlatformHandleVectorPtr platform_handles) {
341 // Handles write/read case and shared buffer becoming available case.
342 return data_pipe_->ProcessCommand(command, std::move(platform_handles));
314 } 343 }
315 344
316 void DataPipeProducerDispatcher::OnReadMessage( 345 void DataPipeProducerDispatcher::OnReadMessage(
317 const MessageInTransit::View& message_view, 346 const MessageInTransit::View& message_view,
318 ScopedPlatformHandleVectorPtr platform_handles) { 347 ScopedPlatformHandleVectorPtr platform_handles) {
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; 348 const DataPipeCommandHeader* command =
349 static_cast<const DataPipeCommandHeader*>(message_view.bytes());
350 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
351
352 if (started_transport_.Try()) {
353 // We're not in the middle of being sent.
354
355 // Can get synchronously called back from RawChannel::Init in InitOnIO if
356 // there was initial data. InitOnIO locks, so don't lock twice.
357 scoped_ptr<base::AutoLock> locker;
358 if (!calling_init_) {
359 locker.reset(new base::AutoLock(lock()));
360 }
361
362 if (ProcessCommand(*command, std::move(platform_handles))) {
363 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
364 }
365 started_transport_.Release();
366 } else {
367 // DataPipe::Serialize calls ReleaseHandle on the channel, which
368 // acquires RawChannel's read_lock_. The function OnReadMessage is only
369 // called while read_lock_ is acquired, and not after ReleaseHandle has been
370 // called. This means this function will only be called before Serialize
371 // calls ReleaseHandle, meaning the serialisation will not have started yet.
372 // We only notify awakables if we're not in the process of being
373 // transported.
374 ProcessCommand(*command, std::move(platform_handles));
375 }
320 } 376 }
321 377
322 void DataPipeProducerDispatcher::OnError(Error error) { 378 void DataPipeProducerDispatcher::OnError(Error error) {
323 switch (error) { 379 switch (error) {
324 case ERROR_READ_BROKEN:
325 case ERROR_READ_BAD_MESSAGE:
326 case ERROR_READ_UNKNOWN:
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
328 break;
329 case ERROR_READ_SHUTDOWN: 380 case ERROR_READ_SHUTDOWN:
330 // The other side was cleanly closed, so this isn't actually an error. 381 // The other side was cleanly closed, so this isn't actually an error.
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; 382 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
332 break; 383 break;
384 case ERROR_READ_BROKEN:
385 LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
386 break;
387 case ERROR_READ_BAD_MESSAGE:
388 // Receiving a bad message means either a bug, data corruption, or
389 // malicious attack (probably due to some other bug).
390 LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
391 << "message)";
392 break;
393 case ERROR_READ_UNKNOWN:
394 LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
395 break;
333 case ERROR_WRITE: 396 case ERROR_WRITE:
334 // Write errors are slightly notable: they probably shouldn't happen under 397 LOG(ERROR) << "DataPipeProducerDispatcher write error";
335 // normal operation (but maybe the other side crashed).
336 LOG(WARNING) << "DataPipeProducerDispatcher write error";
337 break; 398 break;
338 } 399 }
339 400
340 error_ = true; 401 peer_closed_ = true;
341 if (started_transport_.Try()) { 402 if (started_transport_.Try()) {
342 base::AutoLock locker(lock()); 403 base::AutoLock locker(lock());
343 // We can get two OnError callbacks before the post task below completes. 404 // We can get two OnError callbacks before the post task below completes.
344 // Although RawChannel still has a pointer to this object until Shutdown is 405 // Although RawChannel still has a pointer to this object until Shutdown is
345 // called, that is safe since this class always does a PostTask to the IO 406 // called, that is safe since this class always does a PostTask to the IO
346 // thread to self destruct. 407 // thread to self destruct.
347 if (channel_) { 408 if (data_pipe_->GetChannel()) {
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 409 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
349 channel_->Shutdown(); 410 data_pipe_->Shutdown();
350 channel_ = nullptr;
351 } 411 }
352 started_transport_.Release(); 412 started_transport_.Release();
353 } else { 413 } else {
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. 414 // We must be waiting to call ReleaseHandle. It will call Shutdown.
355 } 415 }
356 } 416 }
357 417
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
359 return !two_phase_data_.empty();
360 }
361
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
363 const void* elements,
364 uint32_t num_bytes) {
365 // The maximum amount of data to send per message (make it a multiple of the
366 // element size.
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
369 DCHECK_GT(max_message_num_bytes, 0u);
370
371 uint32_t offset = 0;
372 while (offset < num_bytes) {
373 uint32_t message_num_bytes =
374 std::min(static_cast<uint32_t>(max_message_num_bytes),
375 num_bytes - offset);
376 scoped_ptr<MessageInTransit> message(new MessageInTransit(
377 MessageInTransit::Type::MESSAGE, message_num_bytes,
378 static_cast<const char*>(elements) + offset));
379 if (!channel_->WriteMessage(std::move(message))) {
380 error_ = true;
381 return false;
382 }
383
384 offset += message_num_bytes;
385 }
386
387 return true;
388 }
389
390 void DataPipeProducerDispatcher::SerializeInternal() {
391 // We need to stop watching handle immediately, even though not on IO thread,
392 // so that other messages aren't read after this.
393 if (channel_) {
394 std::vector<char> serialized_read_buffer;
395 std::vector<int> fds;
396 bool write_error = false;
397 serialized_platform_handle_ = channel_->ReleaseHandle(
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
399 &write_error);
400 CHECK(serialized_read_buffer.empty());
401 CHECK(fds.empty());
402 if (write_error)
403 serialized_platform_handle_.reset();
404 channel_ = nullptr;
405 }
406 serialized_ = true;
407 }
408
409 } // namespace edk 418 } // namespace edk
410 } // namespace mojo 419 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698