OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/logging.h" | |
9 #include "base/message_loop/message_loop.h" | |
10 #include "mojo/edk/embedder/embedder_internal.h" | |
11 #include "mojo/edk/system/configuration.h" | |
12 #include "mojo/edk/system/data_pipe.h" | |
13 #include "mojo/edk/system/memory.h" | |
14 | |
15 namespace mojo { | |
16 namespace edk { | |
17 | |
18 void DataPipeProducerDispatcher::Init(ScopedPlatformHandle message_pipe) { | |
19 if (message_pipe.is_valid()) { | |
20 channel_ = RawChannel::Create(message_pipe.Pass()); | |
21 internal::g_io_thread_task_runner->PostTask( | |
22 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | |
23 } | |
24 } | |
25 | |
26 void DataPipeProducerDispatcher::InitOnIO() { | |
27 base::AutoLock locker(lock()); | |
28 if (channel_) | |
29 channel_->Init(this); | |
30 } | |
31 | |
32 void DataPipeProducerDispatcher::CloseOnIO() { | |
33 base::AutoLock locker(lock()); | |
34 if (channel_) { | |
35 channel_->Shutdown(); | |
36 channel_ = nullptr; | |
37 } | |
38 } | |
39 | |
40 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | |
41 return Type::DATA_PIPE_PRODUCER; | |
42 } | |
43 | |
44 scoped_refptr<DataPipeProducerDispatcher> | |
45 DataPipeProducerDispatcher::Deserialize( | |
46 const void* source, | |
47 size_t size, | |
48 PlatformHandleVector* platform_handles) { | |
49 MojoCreateDataPipeOptions options; | |
50 ScopedPlatformHandle platform_handle = | |
51 DataPipe::Deserialize(source, size, platform_handles, &options, | |
52 nullptr, 0); | |
53 | |
54 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | |
55 if (platform_handle.is_valid()) | |
56 rv->Init(platform_handle.Pass()); | |
57 return rv; | |
58 } | |
59 | |
60 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | |
61 const MojoCreateDataPipeOptions& options) | |
62 : options_(options), channel_(nullptr), error_(false) { | |
63 } | |
64 | |
65 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | |
66 // |Close()|/|CloseImplNoLock()| should have taken care of the channel. | |
67 DCHECK(!channel_); | |
68 } | |
69 | |
70 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | |
71 lock().AssertAcquired(); | |
72 awakable_list_.CancelAll(); | |
73 } | |
74 | |
75 void DataPipeProducerDispatcher::CloseImplNoLock() { | |
76 lock().AssertAcquired(); | |
77 internal::g_io_thread_task_runner->PostTask( | |
78 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | |
79 } | |
80 | |
81 scoped_refptr<Dispatcher> | |
82 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | |
83 lock().AssertAcquired(); | |
84 | |
85 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | |
86 rv->channel_ = channel_; | |
87 channel_ = nullptr; | |
88 rv->options_ = options_; | |
89 return scoped_refptr<Dispatcher>(rv.get()); | |
90 } | |
91 | |
92 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | |
93 UserPointer<const void> elements, | |
94 UserPointer<uint32_t> num_bytes, | |
95 MojoWriteDataFlags flags) { | |
96 lock().AssertAcquired(); | |
97 if (InTwoPhaseWrite()) | |
98 return MOJO_RESULT_BUSY; | |
99 if (error_) | |
100 return MOJO_RESULT_FAILED_PRECONDITION; | |
101 if (num_bytes.Get() % options_.element_num_bytes != 0) | |
102 return MOJO_RESULT_INVALID_ARGUMENT; | |
103 if (num_bytes.Get() == 0) | |
104 return MOJO_RESULT_OK; // Nothing to do. | |
105 | |
106 // For now, we ignore options.capacity_num_bytes as a total of all pending | |
107 // writes (and just treat it per message). We will implement that later if | |
108 // we need to. All current uses want all their data to be sent, and it's not | |
109 // clear that this backpressure should be done at the mojo layer or at a | |
110 // higher application layer. | |
111 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | |
112 uint32_t min_num_bytes_to_write = all_or_none ? num_bytes.Get() : 0; | |
113 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | |
114 // Don't return "should wait" since you can't wait for a specified amount of | |
115 // data. | |
116 return MOJO_RESULT_OUT_OF_RANGE; | |
117 } | |
118 | |
119 size_t num_bytes_to_write = | |
120 std::min(static_cast<size_t>(num_bytes.Get()), | |
Ken Rockot(use gerrit already)
2015/09/23 22:32:17
size_t should be uint32_t
| |
121 options_.capacity_num_bytes); | |
122 if (num_bytes_to_write == 0) | |
123 return MOJO_RESULT_SHOULD_WAIT; | |
124 | |
125 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | |
126 | |
127 num_bytes.Put(num_bytes_to_write); | |
128 WriteDataIntoMessages(elements, num_bytes_to_write); | |
129 | |
130 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
131 if (!new_state.equals(old_state)) | |
132 awakable_list_.AwakeForStateChange(new_state); | |
133 return MOJO_RESULT_OK; | |
134 } | |
135 | |
136 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | |
137 UserPointer<void*> buffer, | |
138 UserPointer<uint32_t> buffer_num_bytes, | |
139 MojoWriteDataFlags flags) { | |
140 lock().AssertAcquired(); | |
141 if (InTwoPhaseWrite()) | |
142 return MOJO_RESULT_BUSY; | |
143 if (error_) | |
144 return MOJO_RESULT_FAILED_PRECONDITION; | |
145 | |
146 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | |
147 uint32_t min_num_bytes_to_write = 0; | |
148 if (all_or_none) { | |
149 min_num_bytes_to_write = buffer_num_bytes.Get(); | |
150 if (min_num_bytes_to_write % options_.element_num_bytes != 0) | |
151 return MOJO_RESULT_INVALID_ARGUMENT; | |
152 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | |
153 // Don't return "should wait" since you can't wait for a specified amount | |
154 // of data. | |
155 return MOJO_RESULT_OUT_OF_RANGE; | |
156 } | |
157 } | |
158 | |
159 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | |
160 if (buffer_num_bytes.Get() == 0) | |
161 buffer_num_bytes.Put(options_.capacity_num_bytes); | |
162 | |
163 two_phase_data_.resize(buffer_num_bytes.Get()); | |
164 buffer.Put(&two_phase_data_[0]); | |
165 | |
166 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | |
167 // we can construct a MessageInTransit here. But then we need to make | |
168 // MessageInTransit support changing its data size later. | |
169 | |
170 return MOJO_RESULT_OK; | |
171 } | |
172 | |
173 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | |
174 uint32_t num_bytes_written) { | |
175 lock().AssertAcquired(); | |
176 if (!InTwoPhaseWrite()) | |
177 return MOJO_RESULT_FAILED_PRECONDITION; | |
178 | |
179 // Note: Allow successful completion of the two-phase write even if the other | |
180 // side has been closed. | |
181 MojoResult rv = MOJO_RESULT_OK; | |
182 if (num_bytes_written > two_phase_data_.size() || | |
183 num_bytes_written % options_.element_num_bytes != 0) { | |
184 rv = MOJO_RESULT_INVALID_ARGUMENT; | |
185 } else if (channel_) { | |
186 WriteDataIntoMessages( | |
187 MakeUserPointer(static_cast<const void*>(&two_phase_data_[0])), | |
188 num_bytes_written); | |
189 } | |
190 | |
191 // Two-phase write ended even on failure. | |
192 two_phase_data_.clear(); | |
193 // If we're now writable, we *became* writable (since we weren't writable | |
194 // during the two-phase write), so awake producer awakables. | |
195 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
196 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
197 awakable_list_.AwakeForStateChange(new_state); | |
198 | |
199 return rv; | |
200 } | |
201 | |
202 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | |
203 const { | |
204 lock().AssertAcquired(); | |
205 | |
206 HandleSignalsState rv; | |
207 if (!error_) { | |
208 if (!InTwoPhaseWrite()) | |
209 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
210 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | |
211 } else { | |
212 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
213 } | |
214 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
215 return rv; | |
216 } | |
217 | |
218 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | |
219 Awakable* awakable, | |
220 MojoHandleSignals signals, | |
221 uint32_t context, | |
222 HandleSignalsState* signals_state) { | |
223 lock().AssertAcquired(); | |
224 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | |
225 if (state.satisfies(signals)) { | |
226 if (signals_state) | |
227 *signals_state = state; | |
228 return MOJO_RESULT_ALREADY_EXISTS; | |
229 } | |
230 if (!state.can_satisfy(signals)) { | |
231 if (signals_state) | |
232 *signals_state = state; | |
233 return MOJO_RESULT_FAILED_PRECONDITION; | |
234 } | |
235 | |
236 awakable_list_.Add(awakable, signals, context); | |
237 return MOJO_RESULT_OK; | |
238 } | |
239 | |
240 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | |
241 Awakable* awakable, | |
242 HandleSignalsState* signals_state) { | |
243 lock().AssertAcquired(); | |
244 awakable_list_.Remove(awakable); | |
245 if (signals_state) | |
246 *signals_state = GetHandleSignalsStateImplNoLock(); | |
247 } | |
248 | |
249 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | |
250 size_t* max_size, | |
251 size_t* max_platform_handles) { | |
252 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
253 | |
254 if (channel_) { | |
255 std::vector<char> temp; | |
256 serialized_platform_handle_ = channel_->ReleaseHandle(&temp); | |
257 channel_ = nullptr; | |
258 DCHECK(temp.empty()); | |
259 } | |
260 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
261 false, max_size, max_platform_handles); | |
262 } | |
263 | |
264 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | |
265 void* destination, | |
266 size_t* actual_size, | |
267 PlatformHandleVector* platform_handles) { | |
268 DCHECK(HasOneRef()); // Only one ref => no need to take the lock. | |
269 | |
270 DataPipe::EndSerialize( | |
271 options_, | |
272 serialized_platform_handle_.Pass(), | |
273 ScopedPlatformHandle(), 0, | |
274 destination, actual_size, platform_handles); | |
275 CloseImplNoLock(); | |
276 return true; | |
277 } | |
278 | |
279 void DataPipeProducerDispatcher::TransportStarted() { | |
280 started_transport_.Acquire(); | |
281 } | |
282 | |
283 void DataPipeProducerDispatcher::TransportEnded() { | |
284 started_transport_.Release(); | |
285 } | |
286 | |
287 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | |
288 lock().AssertAcquired(); | |
289 return InTwoPhaseWrite(); | |
290 } | |
291 | |
292 void DataPipeProducerDispatcher::OnReadMessage( | |
293 const MessageInTransit::View& message_view, | |
294 ScopedPlatformHandleVectorPtr platform_handles) { | |
295 NOTREACHED(); | |
296 } | |
297 | |
298 void DataPipeProducerDispatcher::OnError(Error error) { | |
299 switch (error) { | |
300 case ERROR_READ_SHUTDOWN: | |
301 case ERROR_READ_BROKEN: | |
302 case ERROR_READ_BAD_MESSAGE: | |
303 case ERROR_READ_UNKNOWN: | |
304 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't read messages"; | |
305 break; | |
306 case ERROR_WRITE: | |
307 // Write errors are slightly notable: they probably shouldn't happen under | |
308 // normal operation (but maybe the other side crashed). | |
309 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | |
310 break; | |
311 } | |
312 | |
313 error_ = true; | |
314 if (started_transport_.Try()) { | |
315 base::AutoLock locker(lock()); | |
316 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
317 | |
318 base::MessageLoop::current()->PostTask( | |
319 FROM_HERE, | |
320 base::Bind(&RawChannel::Shutdown, base::Unretained(channel_))); | |
321 channel_ = nullptr; | |
322 started_transport_.Release(); | |
323 } else { | |
324 // We must be waiting to call ReleaseHandle. It will call Shutdown. | |
325 } | |
326 } | |
327 | |
328 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | |
329 return !two_phase_data_.empty(); | |
330 } | |
331 | |
332 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | |
333 UserPointer<const void> elements, | |
334 size_t num_bytes) { | |
335 // The maximum amount of data to send per message (make it a multiple of the | |
336 // element size. | |
337 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
338 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | |
339 DCHECK_GT(max_message_num_bytes, 0u); | |
340 | |
341 size_t offset = 0; | |
342 while (offset < num_bytes) { | |
343 size_t message_num_bytes = | |
344 std::min(max_message_num_bytes, num_bytes - offset); | |
345 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
346 MessageInTransit::Type::MESSAGE, message_num_bytes, | |
347 elements.At(offset))); | |
348 if (!channel_->WriteMessage(message.Pass())) { | |
349 error_ = true; | |
350 return false; | |
351 } | |
352 | |
353 offset += message_num_bytes; | |
354 } | |
355 | |
356 return true; | |
357 } | |
358 | |
359 } // namespace edk | |
360 } // namespace mojo | |
OLD | NEW |