Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: convert remaining MP tests and simplify RawChannel destruction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop/message_loop.h"
10 #include "mojo/edk/embedder/embedder_internal.h"
11 #include "mojo/edk/system/configuration.h"
12 #include "mojo/edk/system/data_pipe.h"
13 #include "mojo/edk/system/memory.h"
14
15 namespace mojo {
16 namespace system {
17
18 void DataPipeProducerDispatcher::Init(
19 embedder::ScopedPlatformHandle message_pipe) {
20 if (message_pipe.is_valid()) {
21 channel_ = RawChannel::Create(message_pipe.Pass());
22 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
23 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
24 }
25 }
26
27 void DataPipeProducerDispatcher::InitOnIO() {
28 base::AutoLock locker(lock());
29 if (channel_)
30 channel_->Init(this);
31 }
32
33 void DataPipeProducerDispatcher::CloseOnIO() {
34 base::AutoLock locker(lock());
35 if (channel_) {
36 channel_->Shutdown();
37 channel_ = nullptr;
38 }
39 }
40
41 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
42 return Type::DATA_PIPE_PRODUCER;
43 }
44
45 scoped_refptr<DataPipeProducerDispatcher>
46 DataPipeProducerDispatcher::Deserialize(
47 const void* source,
48 size_t size,
49 embedder::PlatformHandleVector* platform_handles) {
50 MojoCreateDataPipeOptions options;
51 embedder::ScopedPlatformHandle platform_handle =
52 DataPipe::Deserialize(source, size, platform_handles, &options,
53 nullptr, 0);
54
55 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
56 if (platform_handle.is_valid())
57 rv->Init(platform_handle.Pass());
58 return rv;
59 }
60
61 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
62 const MojoCreateDataPipeOptions& options)
63 : options_(options), channel_(nullptr), error_(false) {
64 }
65
66 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
67 // |Close()|/|CloseImplNoLock()| should have taken care of the channel.
68 DCHECK(!channel_);
69 }
70
71 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
72 lock().AssertAcquired();
73 awakable_list_.CancelAll();
74 }
75
76 void DataPipeProducerDispatcher::CloseImplNoLock() {
77 lock().AssertAcquired();
78 mojo::embedder::internal::g_io_thread_task_runner->PostTask(
79 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this));
80 }
81
82 scoped_refptr<Dispatcher>
83 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
84 lock().AssertAcquired();
85
86 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
87 rv->channel_ = channel_;
88 channel_ = nullptr;
89 rv->options_ = options_;
90 return scoped_refptr<Dispatcher>(rv.get());
91 }
92
93 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
94 UserPointer<const void> elements,
95 UserPointer<uint32_t> num_bytes,
96 MojoWriteDataFlags flags) {
97 lock().AssertAcquired();
98 if (InTwoPhaseWrite())
99 return MOJO_RESULT_BUSY;
100 if (error_)
101 return MOJO_RESULT_FAILED_PRECONDITION;
102 if (num_bytes.Get() % options_.element_num_bytes != 0)
103 return MOJO_RESULT_INVALID_ARGUMENT;
104 if (num_bytes.Get() == 0)
105 return MOJO_RESULT_OK; // Nothing to do.
106
107 // For now, we ignore options.capacity_num_bytes as a total of all pending
108 // writes (and just treat this per message). We will implement that later if
109 // we need to. All current uses want all their data to be sent, and it's not
110 // clear that this backpressure should be done at the mojo layer or at a
111 // higher application layer.
112 if (num_bytes.Get() > options_.capacity_num_bytes)
113 return MOJO_RESULT_OUT_OF_RANGE;
114
115 // Ignore MOJO_WRITE_DATA_FLAG_ALL_OR_NONE for now, it's also not clear we
116 // need/want the functionality to only write a subset at this layer.
117
118 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
119
120 WriteDataIntoMessages(elements, num_bytes);
121
122 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
123 if (!new_state.equals(old_state))
124 awakable_list_.AwakeForStateChange(new_state);
125 return MOJO_RESULT_OK;
126 }
127
128 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
129 UserPointer<void*> buffer,
130 UserPointer<uint32_t> buffer_num_bytes,
131 MojoWriteDataFlags flags) {
132 lock().AssertAcquired();
133 if (InTwoPhaseWrite())
134 return MOJO_RESULT_BUSY;
135 if (error_)
136 return MOJO_RESULT_FAILED_PRECONDITION;
137 if (buffer_num_bytes.Get() % options_.element_num_bytes != 0)
138 return MOJO_RESULT_INVALID_ARGUMENT;
139 if (buffer_num_bytes.Get() > options_.capacity_num_bytes)
140 return MOJO_RESULT_OUT_OF_RANGE;
141
142 if (buffer_num_bytes.Get() == 0)
143 buffer_num_bytes.Put(options_.capacity_num_bytes);
144
145 two_phase_data_.resize(buffer_num_bytes.Get());
146 buffer.Put(&two_phase_data_[0]);
147
148 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
149 // we can construct a MessageInTransit here. But then we need to make
150 // MessageInTransit support changing its data size later.
151
152 return MOJO_RESULT_OK;
153 }
154
155 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
156 uint32_t num_bytes_written) {
157 lock().AssertAcquired();
158 if (!InTwoPhaseWrite())
159 return MOJO_RESULT_FAILED_PRECONDITION;
160
161 // Note: Allow successful completion of the two-phase write even if the other
162 // side has been closed.
163 MojoResult rv;
164 if (num_bytes_written > two_phase_data_.size() ||
165 num_bytes_written % options_.element_num_bytes != 0) {
166 rv = MOJO_RESULT_INVALID_ARGUMENT;
167 } else {
168 WriteDataIntoMessages(
169 MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])),
170 MakeUserPointer(&num_bytes_written));
171 rv = MOJO_RESULT_OK;
172 }
173
174 // Two-phase write ended even on failure.
175 two_phase_data_.clear();
176 // If we're now writable, we *became* writable (since we weren't writable
177 // during the two-phase write), so awake producer awakables.
178 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
179 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
180 awakable_list_.AwakeForStateChange(new_state);
181
182 return rv;
183 }
184
185 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
186 const {
187 lock().AssertAcquired();
188
189 HandleSignalsState rv;
190 if (!error_) {
191 if (!InTwoPhaseWrite())
192 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
193 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
194 } else {
195 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
196 }
197 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
198 return rv;
199 }
200
201 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
202 Awakable* awakable,
203 MojoHandleSignals signals,
204 uint32_t context,
205 HandleSignalsState* signals_state) {
206 lock().AssertAcquired();
207 HandleSignalsState state = GetHandleSignalsStateImplNoLock();
208 if (state.satisfies(signals)) {
209 if (signals_state)
210 *signals_state = state;
211 return MOJO_RESULT_ALREADY_EXISTS;
212 }
213 if (!state.can_satisfy(signals)) {
214 if (signals_state)
215 *signals_state = state;
216 return MOJO_RESULT_FAILED_PRECONDITION;
217 }
218
219 awakable_list_.Add(awakable, signals, context);
220 return MOJO_RESULT_OK;
221 }
222
223 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
224 Awakable* awakable,
225 HandleSignalsState* signals_state) {
226 lock().AssertAcquired();
227 awakable_list_.Remove(awakable);
228 if (signals_state)
229 *signals_state = GetHandleSignalsStateImplNoLock();
230 }
231
232 void DataPipeProducerDispatcher::StartSerializeImplNoLock(
233 size_t* max_size,
234 size_t* max_platform_handles) {
235 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
236
237 if (channel_) {
238 std::vector<char> temp;
239 serialized_platform_handle_ = channel_->ReleaseHandle(&temp);
240 channel_ = nullptr;
241 DCHECK(temp.empty());
242 }
243 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
244 false, max_size, max_platform_handles);
245 }
246
247 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
248 void* destination,
249 size_t* actual_size,
250 embedder::PlatformHandleVector* platform_handles) {
251 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
252
253 DataPipe::EndSerialize(
254 options_,
255 serialized_platform_handle_.Pass(),
256 embedder::ScopedPlatformHandle(), 0,
257 destination, actual_size, platform_handles);
258 CloseImplNoLock();
259 return true;
260 }
261
262 void DataPipeProducerDispatcher::TransportStarted() {
263 started_transport_.Acquire();
264 }
265
266 void DataPipeProducerDispatcher::TransportEnded() {
267 started_transport_.Release();
268 }
269
270 bool DataPipeProducerDispatcher::IsBusyNoLock() const {
271 lock().AssertAcquired();
272 return InTwoPhaseWrite();
273 }
274
275 void DataPipeProducerDispatcher::OnReadMessage(
276 const MessageInTransit::View& message_view,
277 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
278 NOTREACHED();
279 }
280
281 void DataPipeProducerDispatcher::OnError(Error error) {
282 switch (error) {
283 case ERROR_READ_SHUTDOWN:
284 case ERROR_READ_BROKEN:
285 case ERROR_READ_BAD_MESSAGE:
286 case ERROR_READ_UNKNOWN:
287 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages";
288 break;
289 case ERROR_WRITE:
290 // Write errors are slightly notable: they probably shouldn't happen under
291 // normal operation (but maybe the other side crashed).
292 LOG(WARNING) << "DataPipeProducerDispatcher write error";
293 break;
294 }
295
296 error_ = true;
297 if (started_transport_.Try()) {
298 base::AutoLock locker(lock());
299 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
300
301 base::MessageLoop::current()->PostTask(
302 FROM_HERE,
303 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_)));
304 channel_ = nullptr;
305 started_transport_.Release();
306 } else {
307 // We must be waiting to call ReleaseHandle. It will call Shutdown.
308 }
309 }
310
311 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
312 return !two_phase_data_.empty();
313 }
314
315 bool DataPipeProducerDispatcher::WriteDataIntoMessages(
316 UserPointer<const void> elements,
317 UserPointer<uint32_t> num_bytes) {
318 // The maximum amount of data to send per message (make it a multiple of the
319 // element size.
320 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
321 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
322 DCHECK_GT(max_message_num_bytes, 0u);
323
324 size_t offset = 0;
325 while (offset < num_bytes.Get()) {
326 size_t message_num_bytes =
327 std::min(max_message_num_bytes, num_bytes.Get() - offset);
328 scoped_ptr<MessageInTransit> message(new MessageInTransit(
329 MessageInTransit::Type::MESSAGE, message_num_bytes,
330 elements.At(offset)));
331 if (!channel_->WriteMessage(message.Pass())) {
332 error_ = true;
333 return false;
334 }
335
336 offset += message_num_bytes;
337 }
338
339 return true;
340 }
341
342 } // namespace system
343 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698