Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(332)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <algorithm>
10 #include <utility> 11 #include <utility>
11 12
12 #include "base/bind.h" 13 #include "base/bind.h"
13 #include "base/logging.h" 14 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h" 15 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder_internal.h" 16 #include "mojo/edk/embedder/embedder_internal.h"
16 #include "mojo/edk/embedder/platform_shared_buffer.h" 17 #include "mojo/edk/embedder/platform_shared_buffer.h"
17 #include "mojo/edk/embedder/platform_support.h" 18 #include "mojo/edk/embedder/platform_support.h"
18 #include "mojo/edk/system/configuration.h" 19 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/data_pipe.h" 20 #include "mojo/edk/system/transport_data.h"
20 21
21 namespace mojo { 22 namespace mojo {
22 namespace edk { 23 namespace edk {
23 24
24 void DataPipeProducerDispatcher::Init( 25 void DataPipeProducerDispatcher::Init(
25 ScopedPlatformHandle message_pipe, 26 ScopedPlatformHandle channel_handle,
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { 27 scoped_refptr<PlatformSharedBuffer> shared_buffer) {
27 if (message_pipe.is_valid()) { 28 CHECK(shared_buffer);
28 channel_ = RawChannel::Create(std::move(message_pipe)); 29 if (channel_handle.is_valid()) {
29 channel_->SetSerializedData( 30 RawChannel* channel = RawChannel::Create(std::move(channel_handle));
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, 31 data_pipe_->set_channel(channel);
31 nullptr, nullptr);
32 internal::g_io_thread_task_runner->PostTask(
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
34 } else {
35 error_ = true;
36 } 32 }
33 data_pipe_->set_shared_buffer(shared_buffer);
34 InitInternal();
35 }
36
37 void DataPipeProducerDispatcher::InitInternal() {
38 peer_closed_ = data_pipe_->channel() == nullptr;
39 internal::g_io_thread_task_runner->PostTask(
40 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
41 data_pipe_->Init();
37 } 42 }
38 43
39 void DataPipeProducerDispatcher::InitOnIO() { 44 void DataPipeProducerDispatcher::InitOnIO() {
40 base::AutoLock locker(lock()); 45 base::AutoLock locker(lock());
41 if (channel_) 46 calling_init_ = true;
42 channel_->Init(this); 47 RawChannel* channel = data_pipe_->channel();
48 if (channel)
49 channel->Init(this);
50 calling_init_ = false;
43 } 51 }
44 52
45 void DataPipeProducerDispatcher::CloseOnIO() { 53 void DataPipeProducerDispatcher::CloseOnIO() {
46 base::AutoLock locker(lock()); 54 base::AutoLock locker(lock());
47 if (channel_) { 55 data_pipe_->Shutdown();
48 channel_->Shutdown();
49 channel_ = nullptr;
50 }
51 } 56 }
52 57
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { 58 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
54 return Type::DATA_PIPE_PRODUCER; 59 return Type::DATA_PIPE_PRODUCER;
55 } 60 }
56 61
57 scoped_refptr<DataPipeProducerDispatcher> 62 scoped_refptr<DataPipeProducerDispatcher>
58 DataPipeProducerDispatcher::Deserialize( 63 DataPipeProducerDispatcher::Deserialize(
59 const void* source, 64 const void* source,
60 size_t size, 65 size_t size,
61 PlatformHandleVector* platform_handles) { 66 PlatformHandleVector* platform_handles) {
62 MojoCreateDataPipeOptions options; 67 scoped_refptr<DataPipe> data_pipe(
63 ScopedPlatformHandle shared_memory_handle; 68 DataPipe::Deserialize(source, size, platform_handles));
64 size_t shared_memory_size = 0; 69 scoped_refptr<DataPipeProducerDispatcher> rv(
65 ScopedPlatformHandle platform_handle = 70 new DataPipeProducerDispatcher(data_pipe));
66 DataPipe::Deserialize(source, size, platform_handles, &options, 71 rv->InitInternal();
67 &shared_memory_handle, &shared_memory_size);
68
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
70
71 char* serialized_write_buffer = nullptr;
72 size_t serialized_write_buffer_size = 0;
73 scoped_refptr<PlatformSharedBuffer> shared_buffer;
74 scoped_ptr<PlatformSharedBufferMapping> mapping;
75 if (shared_memory_size) {
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
77 shared_memory_size, std::move(shared_memory_handle));
78 mapping = shared_buffer->Map(0, shared_memory_size);
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase());
80 serialized_write_buffer_size = shared_memory_size;
81 }
82
83 rv->Init(std::move(platform_handle), serialized_write_buffer,
84 serialized_write_buffer_size);
85 return rv; 72 return rv;
86 } 73 }
87 74
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( 75 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
89 const MojoCreateDataPipeOptions& options) 76 const MojoCreateDataPipeOptions& options)
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { 77 : DataPipeProducerDispatcher(new DataPipe(options)) {}
91 } 78
79 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
80 scoped_refptr<DataPipe> data_pipe)
81 : data_pipe_(data_pipe),
82 calling_init_(false),
83 peer_closed_(false),
84 in_two_phase_write_(false),
85 two_phase_max_bytes_write_(0u) {}
92 86
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { 87 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
94 // See comment in ~MessagePipeDispatcher. 88 // See comment in ~MessagePipeDispatcher.
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) 89 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
96 channel_->Shutdown(); 90 data_pipe_->Shutdown();
97 else 91 else
98 DCHECK(!channel_); 92 DCHECK(!data_pipe_->channel());
99 } 93 }
100 94
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { 95 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
102 lock().AssertAcquired(); 96 lock().AssertAcquired();
103 awakable_list_.CancelAll(); 97 awakable_list_.CancelAll();
104 } 98 }
105 99
106 void DataPipeProducerDispatcher::CloseImplNoLock() { 100 void DataPipeProducerDispatcher::CloseImplNoLock() {
107 lock().AssertAcquired(); 101 lock().AssertAcquired();
108 internal::g_io_thread_task_runner->PostTask( 102 internal::g_io_thread_task_runner->PostTask(
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); 103 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
110 } 104 }
111 105
112 scoped_refptr<Dispatcher> 106 scoped_refptr<Dispatcher>
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 107 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
108 // This function is used by TransportData to make sure there are no references
109 // to the dispatcher it is trying to serialize and transport.
114 lock().AssertAcquired(); 110 lock().AssertAcquired();
115 111
116 SerializeInternal(); 112 scoped_refptr<DataPipeProducerDispatcher> rv = Create(data_pipe_->options());
113 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
117 114
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); 115 DCHECK(!in_two_phase_write_);
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); 116
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
121 rv->serialized_ = true;
122 return scoped_refptr<Dispatcher>(rv.get()); 117 return scoped_refptr<Dispatcher>(rv.get());
123 } 118 }
124 119
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( 120 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
126 const void* elements, 121 const void* elements,
127 uint32_t* num_bytes, 122 uint32_t* num_bytes,
128 MojoWriteDataFlags flags) { 123 MojoWriteDataFlags flags) {
129 lock().AssertAcquired(); 124 lock().AssertAcquired();
130 if (InTwoPhaseWrite()) 125 if (in_two_phase_write_)
131 return MOJO_RESULT_BUSY; 126 return MOJO_RESULT_BUSY;
132 if (error_) 127 if (peer_closed_)
133 return MOJO_RESULT_FAILED_PRECONDITION; 128 return MOJO_RESULT_FAILED_PRECONDITION;
134 if (*num_bytes % options_.element_num_bytes != 0) 129 if (*num_bytes % data_pipe_->options().element_num_bytes != 0)
135 return MOJO_RESULT_INVALID_ARGUMENT; 130 return MOJO_RESULT_INVALID_ARGUMENT;
136 if (*num_bytes == 0) 131 if (*num_bytes == 0)
137 return MOJO_RESULT_OK; // Nothing to do. 132 return MOJO_RESULT_OK; // Nothing to do.
138 133
139 // For now, we ignore options.capacity_num_bytes as a total of all pending 134 // Don't write non element sized chunks.
140 // writes (and just treat it per message). We will implement that later if 135 uint32_t writable = data_pipe_->GetWritableBytes();
141 // we need to. All current uses want all their data to be sent, and it's not 136 writable -= writable % data_pipe_->options().element_num_bytes;
142 // clear that this backpressure should be done at the mojo layer or at a 137
143 // higher application layer.
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; 138 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; 139 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { 140 if (min_num_bytes_to_write > writable) {
147 // Don't return "should wait" since you can't wait for a specified amount of 141 // Don't return "should wait" since you can't wait for a specified amount of
148 // data. 142 // data.
149 return MOJO_RESULT_OUT_OF_RANGE; 143 return MOJO_RESULT_OUT_OF_RANGE;
150 } 144 }
151 145
152 uint32_t num_bytes_to_write = 146 if (writable == 0)
153 std::min(*num_bytes, options_.capacity_num_bytes);
154 if (num_bytes_to_write == 0)
155 return MOJO_RESULT_SHOULD_WAIT; 147 return MOJO_RESULT_SHOULD_WAIT;
156 148
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); 149 uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
150
151 // The failure case for |WriteDataIntoSharedBuffer| is the shared
152 // buffer not existing, so we should wait.
153 if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
154 return MOJO_RESULT_SHOULD_WAIT;
155 }
156
157 // If we can't tell the other end about the write, pretend this write didn't
158 // happen and mark the other end as closed. We deal with any state changes
159 // due to the other side being closed in OnError.
160 if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
161 peer_closed_ = true;
162 return MOJO_RESULT_FAILED_PRECONDITION;
163 }
158 164
159 *num_bytes = num_bytes_to_write; 165 *num_bytes = num_bytes_to_write;
160 WriteDataIntoMessages(elements, num_bytes_to_write);
161 166
167 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
168 data_pipe_->UpdateFromWrite(num_bytes_to_write);
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); 169 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
163 if (!new_state.equals(old_state)) 170 if (!new_state.equals(old_state))
164 awakable_list_.AwakeForStateChange(new_state); 171 awakable_list_.AwakeForStateChange(new_state);
172
165 return MOJO_RESULT_OK; 173 return MOJO_RESULT_OK;
166 } 174 }
167 175
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( 176 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
169 void** buffer, 177 void** buffer,
170 uint32_t* buffer_num_bytes, 178 uint32_t* buffer_num_bytes,
171 MojoWriteDataFlags flags) { 179 MojoWriteDataFlags flags) {
172 lock().AssertAcquired(); 180 lock().AssertAcquired();
173 if (InTwoPhaseWrite()) 181 if (in_two_phase_write_)
174 return MOJO_RESULT_BUSY; 182 return MOJO_RESULT_BUSY;
175 if (error_) 183 if (peer_closed_)
176 return MOJO_RESULT_FAILED_PRECONDITION; 184 return MOJO_RESULT_FAILED_PRECONDITION;
177 185
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. 186 uint32_t max_num_bytes_to_write;
187 void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
188
189 if (max_num_bytes_to_write == 0)
190 return MOJO_RESULT_SHOULD_WAIT;
191
179 if (*buffer_num_bytes == 0) 192 if (*buffer_num_bytes == 0)
180 *buffer_num_bytes = options_.capacity_num_bytes; 193 *buffer_num_bytes = max_num_bytes_to_write;
181 194
182 two_phase_data_.resize(*buffer_num_bytes); 195 // Don't promise more bytes than we have.
183 *buffer = &two_phase_data_[0]; 196 *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes);
184 197
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes 198 two_phase_max_bytes_write_ = *buffer_num_bytes;
186 // we can construct a MessageInTransit here. But then we need to make 199 *buffer = temp_buf;
187 // MessageInTransit support changing its data size later. 200 in_two_phase_write_ = true;
188 201
189 return MOJO_RESULT_OK; 202 return MOJO_RESULT_OK;
190 } 203 }
191 204
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( 205 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
193 uint32_t num_bytes_written) { 206 uint32_t num_bytes_written) {
194 lock().AssertAcquired(); 207 lock().AssertAcquired();
195 if (!InTwoPhaseWrite()) 208 if (!in_two_phase_write_)
196 return MOJO_RESULT_FAILED_PRECONDITION; 209 return MOJO_RESULT_FAILED_PRECONDITION;
197 210
211 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
212 in_two_phase_write_ = false;
213
214 if (num_bytes_written > two_phase_max_bytes_write_ ||
215 num_bytes_written % data_pipe_->options().element_num_bytes != 0) {
216 return MOJO_RESULT_INVALID_ARGUMENT;
217 }
218
219 data_pipe_->UpdateFromWrite(num_bytes_written);
220
221 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
222 if (!new_state.equals(old_state))
223 awakable_list_.AwakeForStateChange(new_state);
224
198 // Note: Allow successful completion of the two-phase write even if the other 225 // Note: Allow successful completion of the two-phase write even if the other
199 // side has been closed. 226 // side has been closed.
200 MojoResult rv = MOJO_RESULT_OK; 227 // Deal with state changes due to peer being closed in OnError.
201 if (num_bytes_written > two_phase_data_.size() || 228 if (!data_pipe_->NotifyWrite(num_bytes_written))
202 num_bytes_written % options_.element_num_bytes != 0) { 229 peer_closed_ = true;
203 rv = MOJO_RESULT_INVALID_ARGUMENT;
204 } else if (channel_) {
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
206 }
207 230
208 // Two-phase write ended even on failure. 231 return MOJO_RESULT_OK;
209 two_phase_data_.clear();
210 // If we're now writable, we *became* writable (since we weren't writable
211 // during the two-phase write), so awake producer awakables.
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
214 awakable_list_.AwakeForStateChange(new_state);
215
216 return rv;
217 } 232 }
218 233
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() 234 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
220 const { 235 const {
221 lock().AssertAcquired(); 236 lock().AssertAcquired();
222 237
223 HandleSignalsState rv; 238 HandleSignalsState rv;
224 if (!error_) { 239 if (!peer_closed_) {
225 if (!InTwoPhaseWrite()) 240 if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 241 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 242 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
228 } else { 243 } else {
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 244 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
230 } 245 }
246
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 247 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
232 return rv; 248 return rv;
233 } 249 }
234 250
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( 251 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
236 Awakable* awakable, 252 Awakable* awakable,
237 MojoHandleSignals signals, 253 MojoHandleSignals signals,
238 uintptr_t context, 254 uintptr_t context,
239 HandleSignalsState* signals_state) { 255 HandleSignalsState* signals_state) {
240 lock().AssertAcquired(); 256 lock().AssertAcquired();
241 if (channel_)
242 channel_->EnsureLazyInitialized();
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); 257 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
244 if (state.satisfies(signals)) { 258 if (state.satisfies(signals)) {
245 if (signals_state) 259 if (signals_state)
246 *signals_state = state; 260 *signals_state = state;
247 return MOJO_RESULT_ALREADY_EXISTS; 261 return MOJO_RESULT_ALREADY_EXISTS;
248 } 262 }
249 if (!state.can_satisfy(signals)) { 263 if (!state.can_satisfy(signals)) {
250 if (signals_state) 264 if (signals_state)
251 *signals_state = state; 265 *signals_state = state;
252 return MOJO_RESULT_FAILED_PRECONDITION; 266 return MOJO_RESULT_FAILED_PRECONDITION;
253 } 267 }
254 268
255 awakable_list_.Add(awakable, signals, context); 269 awakable_list_.Add(awakable, signals, context);
256 return MOJO_RESULT_OK; 270 return MOJO_RESULT_OK;
257 } 271 }
258 272
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( 273 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
260 Awakable* awakable, 274 Awakable* awakable,
261 HandleSignalsState* signals_state) { 275 HandleSignalsState* signals_state) {
262 lock().AssertAcquired(); 276 lock().AssertAcquired();
263 awakable_list_.Remove(awakable); 277 awakable_list_.Remove(awakable);
264 if (signals_state) 278 if (signals_state)
265 *signals_state = GetHandleSignalsStateImplNoLock(); 279 *signals_state = GetHandleSignalsStateImplNoLock();
266 } 280 }
267 281
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( 282 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
269 size_t* max_size, 283 size_t* max_size,
270 size_t* max_platform_handles) { 284 size_t* max_platform_handles) {
271 if (!serialized_) 285 data_pipe_->StartSerialize(max_size, max_platform_handles);
272 SerializeInternal();
273
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
275 !serialized_write_buffer_.empty(), max_size,
276 max_platform_handles);
277 } 286 }
278 287
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( 288 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
280 void* destination, 289 void* destination,
281 size_t* actual_size, 290 size_t* actual_size,
282 PlatformHandleVector* platform_handles) { 291 PlatformHandleVector* platform_handles) {
283 ScopedPlatformHandle shared_memory_handle; 292 data_pipe_->EndSerialize(destination, actual_size, platform_handles);
284 size_t shared_memory_size = serialized_write_buffer_.size();
285 if (shared_memory_size) {
286 scoped_refptr<PlatformSharedBuffer> shared_buffer(
287 internal::g_platform_support->CreateSharedBuffer(
288 shared_memory_size));
289 scoped_ptr<PlatformSharedBufferMapping> mapping(
290 shared_buffer->Map(0, shared_memory_size));
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
292 shared_memory_size);
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
294 }
295
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
297 std::move(shared_memory_handle), shared_memory_size,
298 destination, actual_size, platform_handles);
299 CloseImplNoLock(); 293 CloseImplNoLock();
300 return true; 294 return true;
301 } 295 }
302 296
303 void DataPipeProducerDispatcher::TransportStarted() { 297 void DataPipeProducerDispatcher::TransportStarted() {
304 started_transport_.Acquire(); 298 started_transport_.Acquire();
305 } 299 }
306 300
307 void DataPipeProducerDispatcher::TransportEnded() { 301 void DataPipeProducerDispatcher::TransportEnded() {
308 started_transport_.Release(); 302 started_transport_.Release();
309 } 303 }
310 304
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { 305 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
312 lock().AssertAcquired(); 306 lock().AssertAcquired();
313 return InTwoPhaseWrite(); 307 return in_two_phase_write_;
308 }
309
310 bool DataPipeProducerDispatcher::ProcessCommand(
311 const DataPipeCommandHeader& command,
312 ScopedPlatformHandleVectorPtr platform_handles) {
313 // Handles write/read case and shared buffer becoming available case.
314 return data_pipe_->ProcessCommand(command, std::move(platform_handles));
314 } 315 }
315 316
316 void DataPipeProducerDispatcher::OnReadMessage( 317 void DataPipeProducerDispatcher::OnReadMessage(
317 const MessageInTransit::View& message_view, 318 const MessageInTransit::View& message_view,
318 ScopedPlatformHandleVectorPtr platform_handles) { 319 ScopedPlatformHandleVectorPtr platform_handles) {
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; 320 const DataPipeCommandHeader* command =
321 static_cast<const DataPipeCommandHeader*>(message_view.bytes());
322 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
323
324 if (started_transport_.Try()) {
325 // We're not in the middle of being sent.
326
327 // Can get synchronously called back from RawChannel::Init in InitOnIO if
328 // there was initial data. InitOnIO locks, so don't lock twice.
329 scoped_ptr<base::AutoLock> locker;
330 if (!calling_init_) {
331 locker.reset(new base::AutoLock(lock()));
332 }
333
334 if (ProcessCommand(*command, std::move(platform_handles))) {
335 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
336 }
337 started_transport_.Release();
338 } else {
339 // DataPipe::Serialize calls ReleaseHandle on the channel, which
340 // acquires RawChannel's read_lock_. The function OnReadMessage is only
341 // called while read_lock_ is acquired, and not after ReleaseHandle has been
342 // called. This means this function will only be called before Serialize
343 // calls ReleaseHandle, meaning the serialisation will not have started yet.
344 // We only notify awakables if we're not in the process of being
345 // transported.
346 ProcessCommand(*command, std::move(platform_handles));
347 }
320 } 348 }
321 349
322 void DataPipeProducerDispatcher::OnError(Error error) { 350 void DataPipeProducerDispatcher::OnError(Error error) {
323 switch (error) { 351 switch (error) {
324 case ERROR_READ_BROKEN:
325 case ERROR_READ_BAD_MESSAGE:
326 case ERROR_READ_UNKNOWN:
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
328 break;
329 case ERROR_READ_SHUTDOWN: 352 case ERROR_READ_SHUTDOWN:
330 // The other side was cleanly closed, so this isn't actually an error. 353 // The other side was cleanly closed, so this isn't actually an error.
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; 354 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
332 break; 355 break;
356 case ERROR_READ_BROKEN:
357 LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
358 break;
359 case ERROR_READ_BAD_MESSAGE:
360 // Receiving a bad message means either a bug, data corruption, or
361 // malicious attack (probably due to some other bug).
362 LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
363 << "message)";
364 break;
365 case ERROR_READ_UNKNOWN:
366 LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
367 break;
333 case ERROR_WRITE: 368 case ERROR_WRITE:
334 // Write errors are slightly notable: they probably shouldn't happen under 369 LOG(ERROR) << "DataPipeProducerDispatcher write error";
335 // normal operation (but maybe the other side crashed).
336 LOG(WARNING) << "DataPipeProducerDispatcher write error";
337 break; 370 break;
338 } 371 }
339 372
340 error_ = true; 373 peer_closed_ = true;
341 if (started_transport_.Try()) { 374 if (started_transport_.Try()) {
342 base::AutoLock locker(lock()); 375 base::AutoLock locker(lock());
343 // We can get two OnError callbacks before the post task below completes. 376 // We can get two OnError callbacks before the post task below completes.
344 // Although RawChannel still has a pointer to this object until Shutdown is 377 // Although RawChannel still has a pointer to this object until Shutdown is
345 // called, that is safe since this class always does a PostTask to the IO 378 // called, that is safe since this class always does a PostTask to the IO
346 // thread to self destruct. 379 // thread to self destruct.
347 if (channel_) { 380 if (data_pipe_->channel()) {
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); 381 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
349 channel_->Shutdown(); 382 data_pipe_->Shutdown();
350 channel_ = nullptr;
351 } 383 }
352 started_transport_.Release(); 384 started_transport_.Release();
353 } else { 385 } else {
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. 386 // We must be waiting to call ReleaseHandle. It will call Shutdown.
355 } 387 }
356 } 388 }
357 389
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
359 return !two_phase_data_.empty();
360 }
361
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
363 const void* elements,
364 uint32_t num_bytes) {
365 // The maximum amount of data to send per message (make it a multiple of the
366 // element size.
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
369 DCHECK_GT(max_message_num_bytes, 0u);
370
371 uint32_t offset = 0;
372 while (offset < num_bytes) {
373 uint32_t message_num_bytes =
374 std::min(static_cast<uint32_t>(max_message_num_bytes),
375 num_bytes - offset);
376 scoped_ptr<MessageInTransit> message(new MessageInTransit(
377 MessageInTransit::Type::MESSAGE, message_num_bytes,
378 static_cast<const char*>(elements) + offset));
379 if (!channel_->WriteMessage(std::move(message))) {
380 error_ = true;
381 return false;
382 }
383
384 offset += message_num_bytes;
385 }
386
387 return true;
388 }
389
390 void DataPipeProducerDispatcher::SerializeInternal() {
391 // We need to stop watching handle immediately, even though not on IO thread,
392 // so that other messages aren't read after this.
393 if (channel_) {
394 std::vector<char> serialized_read_buffer;
395 std::vector<int> fds;
396 bool write_error = false;
397 serialized_platform_handle_ = channel_->ReleaseHandle(
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
399 &write_error);
400 CHECK(serialized_read_buffer.empty());
401 CHECK(fds.empty());
402 if (write_error)
403 serialized_platform_handle_.reset();
404 channel_ = nullptr;
405 }
406 serialized_ = true;
407 }
408
409 } // namespace edk 390 } // namespace edk
410 } // namespace mojo 391 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/data_pipe_producer_dispatcher.h ('k') | mojo/edk/system/data_pipe_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698