Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(440)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: move to mojo::edk namespace in preparation for runtim flag Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/data_pipe.h"
13 #include "mojo/edk/system/memory.h"
14
15 namespace mojo {
16 namespace edk {
17
18 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) {
19 if (message_pipe.is_valid()) {
20 channel_ = RawChannel::Create(message_pipe.Pass());
21 internal::g_io_thread_task_runner->PostTask(
22 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
23 }
24 }
25
26 void DataPipeProducerDispatcher::InitOnIO() {
27 base::AutoLock locker(lock());
28 if (channel_)
29 channel_->Init(this);
30 }
31
32 void DataPipeProducerDispatcher::CloseOnIO() {
33 base::AutoLock locker(lock());
34 if (channel_) {
35 channel_->Shutdown();
36 channel_ = nullptr;
37 }
38 }
39
40 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
41 return Type::DATA_PIPE_PRODUCER;
42 }
43
44 scoped_refptr<DataPipeProducerDispatcher>
45 DataPipeProducerDispatcher::Deserialize(
46 const void* source,
47 size_t size,
48 PlatformHandleVector* platform_handles) {
49 MojoCreateDataPipeOptions options;
50 ScopedPlatformHandle platform_handle =
51 DataPipe::Deserialize(source, size, platform_handles, &options,
52 nullptr, 0);
53
54 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
55 if (platform_handle.is_valid())
56 rv->Init(platform_handle.Pass());
57 return rv;
58 }
59
60 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
61 const MojoCreateDataPipeOptions& options)
62 : options_(options), channel_(nullptr), error_(false) {
63 }
64
65 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
66 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
67 DCHECK(!channel_);
68 }
69
70 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
71 lock().AssertAcquired();
72 awakable_list_.CancelAll();
73 }
74
75 void DataPipeProducerDispatcher::CloseImplNoLock() {
76 lock().AssertAcquired();
77 internal::g_io_thread_task_runner->PostTask(
78 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
79 }
80
81 scoped_refptr<Dispatcher>
82 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
83 lock().AssertAcquired();
84
85 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
86 rv->channel_ = channel_;
87 channel_ = nullptr;
88 rv->options_ = options_;
89 return scoped_refptr<Dispatcher>(rv.get());
90 }
91
92 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
93 UserPointer<const void> elements,
94 UserPointer<uint32_t> num_bytes,
95 MojoWriteDataFlags flags) {
96 lock().AssertAcquired();
97 if (InTwoPhaseWrite())
98 return MOJO_RESULT_BUSY;
99 if (error_)
100 return MOJO_RESULT_FAILED_PRECONDITION;
101 if (num_bytes.Get() % options_.element_num_bytes != 0)
102 return MOJO_RESULT_INVALID_ARGUMENT;
103 if (num_bytes.Get() == 0)
104 return MOJO_RESULT_OK; // Nothing to do.
105
106 // For now, we ignore options.capacity_num_bytes as a total of all pending
107 // writes (and just treat it per message). We will implement that later if
108 // we need to. All current uses want all their data to be sent, and it's not
109 // clear that this backpressure should be done at the mojo layer or at a
110 // higher application layer.
111 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
112 uint32_t min_num_bytes_to_write = all_or_none ? num_bytes.Get() : 0;
113 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
114 // Don't return "should wait" since you can't wait for a specified amount of
115 // data.
116 return MOJO_RESULT_OUT_OF_RANGE;
117 }
118
119 size_t num_bytes_to_write =
120 std::min(static_cast<size_t>(num_bytes.Get()),
Ken Rockot(use gerrit already) 2015/09/23 22:32:17 size_t should be uint32_t
121 options_.capacity_num_bytes);
122 if (num_bytes_to_write == 0)
123 return MOJO_RESULT_SHOULD_WAIT;
124
125 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
126
127 num_bytes.Put(num_bytes_to_write);
128 WriteDataIntoMessages(elements, num_bytes_to_write);
129
130 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
131 if (!new_state.equals(old_state))
132 awakable_list_.AwakeForStateChange(new_state);
133 return MOJO_RESULT_OK;
134 }
135
136 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
137 UserPointer<void*> buffer,
138 UserPointer<uint32_t> buffer_num_bytes,
139 MojoWriteDataFlags flags) {
140 lock().AssertAcquired();
141 if (InTwoPhaseWrite())
142 return MOJO_RESULT_BUSY;
143 if (error_)
144 return MOJO_RESULT_FAILED_PRECONDITION;
145
146 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
147 uint32_t min_num_bytes_to_write = 0;
148 if (all_or_none) {
149 min_num_bytes_to_write = buffer_num_bytes.Get();
150 if (min_num_bytes_to_write % options_.element_num_bytes != 0)
151 return MOJO_RESULT_INVALID_ARGUMENT;
152 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
153 // Don't return "should wait" since you can't wait for a specified amount
154 // of data.
155 return MOJO_RESULT_OUT_OF_RANGE;
156 }
157 }
158
159 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
160 if (buffer_num_bytes.Get() == 0)
161 buffer_num_bytes.Put(options_.capacity_num_bytes);
162
163 two_phase_data_.resize(buffer_num_bytes.Get());
164 buffer.Put(&two_phase_data_[0]);
165
166 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
167 // we can construct a MessageInTransit here. But then we need to make
168 // MessageInTransit support changing its data size later.
169
170 return MOJO_RESULT_OK;
171 }
172
173 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
174 uint32_t num_bytes_written) {
175 lock().AssertAcquired();
176 if (!InTwoPhaseWrite())
177 return MOJO_RESULT_FAILED_PRECONDITION;
178
179 // Note: Allow successful completion of the two-phase write even if the other
180 // side has been closed.
181 MojoResult rv = MOJO_RESULT_OK;
182 if (num_bytes_written > two_phase_data_.size() ||
183 num_bytes_written % options_.element_num_bytes != 0) {
184 rv = MOJO_RESULT_INVALID_ARGUMENT;
185 } else if (channel_) {
186 WriteDataIntoMessages(
187 MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])),
188 num_bytes_written);
189 }
190
191 // Two-phase write ended even on failure.
192 two_phase_data_.clear();
193 // If we're now writable, we *became* writable (since we weren't writable
194 // during the two-phase write), so awake producer awakables.
195 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
196 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
197 awakable_list_.AwakeForStateChange(new_state);
198
199 return rv;
200 }
201
202 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
203 const {
204 lock().AssertAcquired();
205
206 HandleSignalsState rv;
207 if (!error_) {
208 if (!InTwoPhaseWrite())
209 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
210 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
211 } else {
212 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
213 }
214 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
215 return rv;
216 }
217
218 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
219 Awakable* awakable,
220 MojoHandleSignals signals,
221 uint32_t context,
222 HandleSignalsState* signals_state) {
223 lock().AssertAcquired();
224 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
225 if (state.satisfies(signals)) {
226 if (signals_state)
227 *signals_state = state;
228 return MOJO_RESULT_ALREADY_EXISTS;
229 }
230 if (!state.can_satisfy(signals)) {
231 if (signals_state)
232 *signals_state = state;
233 return MOJO_RESULT_FAILED_PRECONDITION;
234 }
235
236 awakable_list_.Add(awakable, signals, context);
237 return MOJO_RESULT_OK;
238 }
239
240 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
241 Awakable* awakable,
242 HandleSignalsState* signals_state) {
243 lock().AssertAcquired();
244 awakable_list_.Remove(awakable);
245 if (signals_state)
246 *signals_state = GetHandleSignalsStateImplNoLock();
247 }
248
249 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
250 size_t* max_size,
251 size_t* max_platform_handles) {
252 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
253
254 if (channel_) {
255 std::vector<char> temp;
256 serialized_platform_handle_ = channel_->ReleaseHandle(&temp);
257 channel_ = nullptr;
258 DCHECK(temp.empty());
259 }
260 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
261 false, max_size, max_platform_handles);
262 }
263
264 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
265 void* destination,
266 size_t* actual_size,
267 PlatformHandleVector* platform_handles) {
268 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
269
270 DataPipe::EndSerialize(
271 options_,
272 serialized_platform_handle_.Pass(),
273 ScopedPlatformHandle(), 0,
274 destination, actual_size, platform_handles);
275 CloseImplNoLock();
276 return true;
277 }
278
279 void DataPipeProducerDispatcher::TransportStarted() {
280 started_transport_.Acquire();
281 }
282
283 void DataPipeProducerDispatcher::TransportEnded() {
284 started_transport_.Release();
285 }
286
287 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
288 lock().AssertAcquired();
289 return InTwoPhaseWrite();
290 }
291
292 void DataPipeProducerDispatcher::OnReadMessage(
293 const MessageInTransit::View& message_view,
294 ScopedPlatformHandleVectorPtr platform_handles) {
295 NOTREACHED();
296 }
297
298 void DataPipeProducerDispatcher::OnError(Error error) {
299 switch (error) {
300 case ERROR_READ_SHUTDOWN:
301 case ERROR_READ_BROKEN:
302 case ERROR_READ_BAD_MESSAGE:
303 case ERROR_READ_UNKNOWN:
304 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages";
305 break;
306 case ERROR_WRITE:
307 // Write errors are slightly notable: they probably shouldn't happen under
308 // normal operation (but maybe the other side crashed).
309 LOG(WARNING) << "DataPipeProducerDispatcher write error";
310 break;
311 }
312
313 error_ = true;
314 if (started_transport_.Try()) {
315 base::AutoLock locker(lock());
316 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
317
318 base::MessageLoop::current()->PostTask(
319 FROM_HERE,
320 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
321 channel_ = nullptr;
322 started_transport_.Release();
323 } else {
324 // We must be waiting to call ReleaseHandle. It will call Shutdown.
325 }
326 }
327
328 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
329 return !two_phase_data_.empty();
330 }
331
332 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
333 UserPointer<const void> elements,
334 size_t num_bytes) {
335 // The maximum amount of data to send per message (make it a multiple of the
336 // element size.
337 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
338 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
339 DCHECK_GT(max_message_num_bytes, 0u);
340
341 size_t offset = 0;
342 while (offset < num_bytes) {
343 size_t message_num_bytes =
344 std::min(max_message_num_bytes, num_bytes - offset);
345 scoped_ptr<MessageInTransit> message(new MessageInTransit(
346 MessageInTransit::Type::MESSAGE, message_num_bytes,
347 elements.At(offset)));
348 if (!channel_->WriteMessage(message.Pass())) {
349 error_ = true;
350 return false;
351 }
352
353 offset += message_num_bytes;
354 }
355
356 return true;
357 }
358
359 } // namespace edk
360 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698