OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
| 10 #include <algorithm> |
10 #include <utility> | 11 #include <utility> |
11 | 12 |
12 #include "base/bind.h" | 13 #include "base/bind.h" |
13 #include "base/logging.h" | 14 #include "base/logging.h" |
14 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
15 #include "mojo/edk/embedder/embedder_internal.h" | 16 #include "mojo/edk/embedder/embedder_internal.h" |
16 #include "mojo/edk/embedder/platform_shared_buffer.h" | 17 #include "mojo/edk/embedder/platform_shared_buffer.h" |
17 #include "mojo/edk/embedder/platform_support.h" | 18 #include "mojo/edk/embedder/platform_support.h" |
18 #include "mojo/edk/system/configuration.h" | 19 #include "mojo/edk/system/configuration.h" |
19 #include "mojo/edk/system/data_pipe.h" | 20 #include "mojo/edk/system/transport_data.h" |
20 | 21 |
21 namespace mojo { | 22 namespace mojo { |
22 namespace edk { | 23 namespace edk { |
23 | 24 |
24 void DataPipeProducerDispatcher::Init( | 25 void DataPipeProducerDispatcher::Init( |
25 ScopedPlatformHandle message_pipe, | 26 ScopedPlatformHandle channel_handle, |
26 char* serialized_write_buffer, size_t serialized_write_buffer_size) { | 27 scoped_refptr<PlatformSharedBuffer> shared_buffer) { |
27 if (message_pipe.is_valid()) { | 28 CHECK(shared_buffer); |
28 channel_ = RawChannel::Create(std::move(message_pipe)); | 29 if (channel_handle.is_valid()) { |
29 channel_->SetSerializedData( | 30 RawChannel* channel = RawChannel::Create(std::move(channel_handle)); |
30 nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size, | 31 data_pipe_->set_channel(channel); |
31 nullptr, nullptr); | |
32 internal::g_io_thread_task_runner->PostTask( | |
33 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); | |
34 } else { | |
35 error_ = true; | |
36 } | 32 } |
| 33 data_pipe_->set_shared_buffer(shared_buffer); |
| 34 InitInternal(); |
| 35 } |
| 36 |
| 37 void DataPipeProducerDispatcher::InitInternal() { |
| 38 peer_closed_ = data_pipe_->channel() == nullptr; |
| 39 internal::g_io_thread_task_runner->PostTask( |
| 40 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this)); |
| 41 data_pipe_->Init(); |
37 } | 42 } |
38 | 43 |
39 void DataPipeProducerDispatcher::InitOnIO() { | 44 void DataPipeProducerDispatcher::InitOnIO() { |
40 base::AutoLock locker(lock()); | 45 base::AutoLock locker(lock()); |
41 if (channel_) | 46 calling_init_ = true; |
42 channel_->Init(this); | 47 RawChannel* channel = data_pipe_->channel(); |
| 48 if (channel) |
| 49 channel->Init(this); |
| 50 calling_init_ = false; |
43 } | 51 } |
44 | 52 |
45 void DataPipeProducerDispatcher::CloseOnIO() { | 53 void DataPipeProducerDispatcher::CloseOnIO() { |
46 base::AutoLock locker(lock()); | 54 base::AutoLock locker(lock()); |
47 if (channel_) { | 55 data_pipe_->Shutdown(); |
48 channel_->Shutdown(); | |
49 channel_ = nullptr; | |
50 } | |
51 } | 56 } |
52 | 57 |
53 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { | 58 Dispatcher::Type DataPipeProducerDispatcher::GetType() const { |
54 return Type::DATA_PIPE_PRODUCER; | 59 return Type::DATA_PIPE_PRODUCER; |
55 } | 60 } |
56 | 61 |
57 scoped_refptr<DataPipeProducerDispatcher> | 62 scoped_refptr<DataPipeProducerDispatcher> |
58 DataPipeProducerDispatcher::Deserialize( | 63 DataPipeProducerDispatcher::Deserialize( |
59 const void* source, | 64 const void* source, |
60 size_t size, | 65 size_t size, |
61 PlatformHandleVector* platform_handles) { | 66 PlatformHandleVector* platform_handles) { |
62 MojoCreateDataPipeOptions options; | 67 scoped_refptr<DataPipe> data_pipe( |
63 ScopedPlatformHandle shared_memory_handle; | 68 DataPipe::Deserialize(source, size, platform_handles)); |
64 size_t shared_memory_size = 0; | 69 scoped_refptr<DataPipeProducerDispatcher> rv( |
65 ScopedPlatformHandle platform_handle = | 70 new DataPipeProducerDispatcher(data_pipe)); |
66 DataPipe::Deserialize(source, size, platform_handles, &options, | 71 rv->InitInternal(); |
67 &shared_memory_handle, &shared_memory_size); | |
68 | |
69 scoped_refptr<DataPipeProducerDispatcher> rv(Create(options)); | |
70 | |
71 char* serialized_write_buffer = nullptr; | |
72 size_t serialized_write_buffer_size = 0; | |
73 scoped_refptr<PlatformSharedBuffer> shared_buffer; | |
74 scoped_ptr<PlatformSharedBufferMapping> mapping; | |
75 if (shared_memory_size) { | |
76 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | |
77 shared_memory_size, std::move(shared_memory_handle)); | |
78 mapping = shared_buffer->Map(0, shared_memory_size); | |
79 serialized_write_buffer = static_cast<char*>(mapping->GetBase()); | |
80 serialized_write_buffer_size = shared_memory_size; | |
81 } | |
82 | |
83 rv->Init(std::move(platform_handle), serialized_write_buffer, | |
84 serialized_write_buffer_size); | |
85 return rv; | 72 return rv; |
86 } | 73 } |
87 | 74 |
88 DataPipeProducerDispatcher::DataPipeProducerDispatcher( | 75 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
89 const MojoCreateDataPipeOptions& options) | 76 const MojoCreateDataPipeOptions& options) |
90 : options_(options), channel_(nullptr), error_(false), serialized_(false) { | 77 : DataPipeProducerDispatcher(new DataPipe(options)) {} |
91 } | 78 |
| 79 DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
| 80 scoped_refptr<DataPipe> data_pipe) |
| 81 : data_pipe_(data_pipe), |
| 82 calling_init_(false), |
| 83 peer_closed_(false), |
| 84 in_two_phase_write_(false), |
| 85 two_phase_max_bytes_write_(0u) {} |
92 | 86 |
93 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { | 87 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() { |
94 // See comment in ~MessagePipeDispatcher. | 88 // See comment in ~MessagePipeDispatcher. |
95 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 89 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
96 channel_->Shutdown(); | 90 data_pipe_->Shutdown(); |
97 else | 91 else |
98 DCHECK(!channel_); | 92 DCHECK(!data_pipe_->channel()); |
99 } | 93 } |
100 | 94 |
101 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { | 95 void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() { |
102 lock().AssertAcquired(); | 96 lock().AssertAcquired(); |
103 awakable_list_.CancelAll(); | 97 awakable_list_.CancelAll(); |
104 } | 98 } |
105 | 99 |
106 void DataPipeProducerDispatcher::CloseImplNoLock() { | 100 void DataPipeProducerDispatcher::CloseImplNoLock() { |
107 lock().AssertAcquired(); | 101 lock().AssertAcquired(); |
108 internal::g_io_thread_task_runner->PostTask( | 102 internal::g_io_thread_task_runner->PostTask( |
109 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); | 103 FROM_HERE, base::Bind(&DataPipeProducerDispatcher::CloseOnIO, this)); |
110 } | 104 } |
111 | 105 |
112 scoped_refptr<Dispatcher> | 106 scoped_refptr<Dispatcher> |
113 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 107 DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| 108 // This function is used by TransportData to make sure there are no references |
| 109 // to the dispatcher it is trying to serialize and transport. |
114 lock().AssertAcquired(); | 110 lock().AssertAcquired(); |
115 | 111 |
116 SerializeInternal(); | 112 scoped_refptr<DataPipeProducerDispatcher> rv = Create(data_pipe_->options()); |
| 113 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get()); |
117 | 114 |
118 scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_); | 115 DCHECK(!in_two_phase_write_); |
119 serialized_write_buffer_.swap(rv->serialized_write_buffer_); | 116 |
120 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
121 rv->serialized_ = true; | |
122 return scoped_refptr<Dispatcher>(rv.get()); | 117 return scoped_refptr<Dispatcher>(rv.get()); |
123 } | 118 } |
124 | 119 |
125 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( | 120 MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock( |
126 const void* elements, | 121 const void* elements, |
127 uint32_t* num_bytes, | 122 uint32_t* num_bytes, |
128 MojoWriteDataFlags flags) { | 123 MojoWriteDataFlags flags) { |
129 lock().AssertAcquired(); | 124 lock().AssertAcquired(); |
130 if (InTwoPhaseWrite()) | 125 if (in_two_phase_write_) |
131 return MOJO_RESULT_BUSY; | 126 return MOJO_RESULT_BUSY; |
132 if (error_) | 127 if (peer_closed_) |
133 return MOJO_RESULT_FAILED_PRECONDITION; | 128 return MOJO_RESULT_FAILED_PRECONDITION; |
134 if (*num_bytes % options_.element_num_bytes != 0) | 129 if (*num_bytes % data_pipe_->options().element_num_bytes != 0) |
135 return MOJO_RESULT_INVALID_ARGUMENT; | 130 return MOJO_RESULT_INVALID_ARGUMENT; |
136 if (*num_bytes == 0) | 131 if (*num_bytes == 0) |
137 return MOJO_RESULT_OK; // Nothing to do. | 132 return MOJO_RESULT_OK; // Nothing to do. |
138 | 133 |
139 // For now, we ignore options.capacity_num_bytes as a total of all pending | 134 // Don't write non element sized chunks. |
140 // writes (and just treat it per message). We will implement that later if | 135 uint32_t writable = data_pipe_->GetWritableBytes(); |
141 // we need to. All current uses want all their data to be sent, and it's not | 136 writable -= writable % data_pipe_->options().element_num_bytes; |
142 // clear that this backpressure should be done at the mojo layer or at a | 137 |
143 // higher application layer. | |
144 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; | 138 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE; |
145 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; | 139 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0; |
146 if (min_num_bytes_to_write > options_.capacity_num_bytes) { | 140 if (min_num_bytes_to_write > writable) { |
147 // Don't return "should wait" since you can't wait for a specified amount of | 141 // Don't return "should wait" since you can't wait for a specified amount of |
148 // data. | 142 // data. |
149 return MOJO_RESULT_OUT_OF_RANGE; | 143 return MOJO_RESULT_OUT_OF_RANGE; |
150 } | 144 } |
151 | 145 |
152 uint32_t num_bytes_to_write = | 146 if (writable == 0) |
153 std::min(*num_bytes, options_.capacity_num_bytes); | |
154 if (num_bytes_to_write == 0) | |
155 return MOJO_RESULT_SHOULD_WAIT; | 147 return MOJO_RESULT_SHOULD_WAIT; |
156 | 148 |
157 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 149 uint32_t num_bytes_to_write = std::min(*num_bytes, writable); |
| 150 |
| 151 // The failure case for |WriteDataIntoSharedBuffer| is the shared |
| 152 // buffer not existing, so we should wait. |
| 153 if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) { |
| 154 return MOJO_RESULT_SHOULD_WAIT; |
| 155 } |
| 156 |
| 157 // If we can't tell the other end about the write, pretend this write didn't |
| 158 // happen and mark the other end as closed. We deal with any state changes |
| 159 // due to the other side being closed in OnError. |
| 160 if (!data_pipe_->NotifyWrite(num_bytes_to_write)) { |
| 161 peer_closed_ = true; |
| 162 return MOJO_RESULT_FAILED_PRECONDITION; |
| 163 } |
158 | 164 |
159 *num_bytes = num_bytes_to_write; | 165 *num_bytes = num_bytes_to_write; |
160 WriteDataIntoMessages(elements, num_bytes_to_write); | |
161 | 166 |
| 167 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| 168 data_pipe_->UpdateFromWrite(num_bytes_to_write); |
162 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 169 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
163 if (!new_state.equals(old_state)) | 170 if (!new_state.equals(old_state)) |
164 awakable_list_.AwakeForStateChange(new_state); | 171 awakable_list_.AwakeForStateChange(new_state); |
| 172 |
165 return MOJO_RESULT_OK; | 173 return MOJO_RESULT_OK; |
166 } | 174 } |
167 | 175 |
168 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( | 176 MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock( |
169 void** buffer, | 177 void** buffer, |
170 uint32_t* buffer_num_bytes, | 178 uint32_t* buffer_num_bytes, |
171 MojoWriteDataFlags flags) { | 179 MojoWriteDataFlags flags) { |
172 lock().AssertAcquired(); | 180 lock().AssertAcquired(); |
173 if (InTwoPhaseWrite()) | 181 if (in_two_phase_write_) |
174 return MOJO_RESULT_BUSY; | 182 return MOJO_RESULT_BUSY; |
175 if (error_) | 183 if (peer_closed_) |
176 return MOJO_RESULT_FAILED_PRECONDITION; | 184 return MOJO_RESULT_FAILED_PRECONDITION; |
177 | 185 |
178 // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes. | 186 uint32_t max_num_bytes_to_write; |
| 187 void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write); |
| 188 |
| 189 if (max_num_bytes_to_write == 0) |
| 190 return MOJO_RESULT_SHOULD_WAIT; |
| 191 |
179 if (*buffer_num_bytes == 0) | 192 if (*buffer_num_bytes == 0) |
180 *buffer_num_bytes = options_.capacity_num_bytes; | 193 *buffer_num_bytes = max_num_bytes_to_write; |
181 | 194 |
182 two_phase_data_.resize(*buffer_num_bytes); | 195 // Don't promise more bytes than we have. |
183 *buffer = &two_phase_data_[0]; | 196 *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes); |
184 | 197 |
185 // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes | 198 two_phase_max_bytes_write_ = *buffer_num_bytes; |
186 // we can construct a MessageInTransit here. But then we need to make | 199 *buffer = temp_buf; |
187 // MessageInTransit support changing its data size later. | 200 in_two_phase_write_ = true; |
188 | 201 |
189 return MOJO_RESULT_OK; | 202 return MOJO_RESULT_OK; |
190 } | 203 } |
191 | 204 |
192 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( | 205 MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock( |
193 uint32_t num_bytes_written) { | 206 uint32_t num_bytes_written) { |
194 lock().AssertAcquired(); | 207 lock().AssertAcquired(); |
195 if (!InTwoPhaseWrite()) | 208 if (!in_two_phase_write_) |
196 return MOJO_RESULT_FAILED_PRECONDITION; | 209 return MOJO_RESULT_FAILED_PRECONDITION; |
197 | 210 |
| 211 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| 212 in_two_phase_write_ = false; |
| 213 |
| 214 if (num_bytes_written > two_phase_max_bytes_write_ || |
| 215 num_bytes_written % data_pipe_->options().element_num_bytes != 0) { |
| 216 return MOJO_RESULT_INVALID_ARGUMENT; |
| 217 } |
| 218 |
| 219 data_pipe_->UpdateFromWrite(num_bytes_written); |
| 220 |
| 221 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| 222 if (!new_state.equals(old_state)) |
| 223 awakable_list_.AwakeForStateChange(new_state); |
| 224 |
198 // Note: Allow successful completion of the two-phase write even if the other | 225 // Note: Allow successful completion of the two-phase write even if the other |
199 // side has been closed. | 226 // side has been closed. |
200 MojoResult rv = MOJO_RESULT_OK; | 227 // Deal with state changes due to peer being closed in OnError. |
201 if (num_bytes_written > two_phase_data_.size() || | 228 if (!data_pipe_->NotifyWrite(num_bytes_written)) |
202 num_bytes_written % options_.element_num_bytes != 0) { | 229 peer_closed_ = true; |
203 rv = MOJO_RESULT_INVALID_ARGUMENT; | |
204 } else if (channel_) { | |
205 WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written); | |
206 } | |
207 | 230 |
208 // Two-phase write ended even on failure. | 231 return MOJO_RESULT_OK; |
209 two_phase_data_.clear(); | |
210 // If we're now writable, we *became* writable (since we weren't writable | |
211 // during the two-phase write), so awake producer awakables. | |
212 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
213 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) | |
214 awakable_list_.AwakeForStateChange(new_state); | |
215 | |
216 return rv; | |
217 } | 232 } |
218 | 233 |
219 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() | 234 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock() |
220 const { | 235 const { |
221 lock().AssertAcquired(); | 236 lock().AssertAcquired(); |
222 | 237 |
223 HandleSignalsState rv; | 238 HandleSignalsState rv; |
224 if (!error_) { | 239 if (!peer_closed_) { |
225 if (!InTwoPhaseWrite()) | 240 if (!in_two_phase_write_ && data_pipe_->GetWritableBytes()) |
226 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 241 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
227 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 242 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
228 } else { | 243 } else { |
229 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 244 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
230 } | 245 } |
| 246 |
231 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 247 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
232 return rv; | 248 return rv; |
233 } | 249 } |
234 | 250 |
235 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( | 251 MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock( |
236 Awakable* awakable, | 252 Awakable* awakable, |
237 MojoHandleSignals signals, | 253 MojoHandleSignals signals, |
238 uintptr_t context, | 254 uintptr_t context, |
239 HandleSignalsState* signals_state) { | 255 HandleSignalsState* signals_state) { |
240 lock().AssertAcquired(); | 256 lock().AssertAcquired(); |
241 if (channel_) | |
242 channel_->EnsureLazyInitialized(); | |
243 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 257 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
244 if (state.satisfies(signals)) { | 258 if (state.satisfies(signals)) { |
245 if (signals_state) | 259 if (signals_state) |
246 *signals_state = state; | 260 *signals_state = state; |
247 return MOJO_RESULT_ALREADY_EXISTS; | 261 return MOJO_RESULT_ALREADY_EXISTS; |
248 } | 262 } |
249 if (!state.can_satisfy(signals)) { | 263 if (!state.can_satisfy(signals)) { |
250 if (signals_state) | 264 if (signals_state) |
251 *signals_state = state; | 265 *signals_state = state; |
252 return MOJO_RESULT_FAILED_PRECONDITION; | 266 return MOJO_RESULT_FAILED_PRECONDITION; |
253 } | 267 } |
254 | 268 |
255 awakable_list_.Add(awakable, signals, context); | 269 awakable_list_.Add(awakable, signals, context); |
256 return MOJO_RESULT_OK; | 270 return MOJO_RESULT_OK; |
257 } | 271 } |
258 | 272 |
259 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( | 273 void DataPipeProducerDispatcher::RemoveAwakableImplNoLock( |
260 Awakable* awakable, | 274 Awakable* awakable, |
261 HandleSignalsState* signals_state) { | 275 HandleSignalsState* signals_state) { |
262 lock().AssertAcquired(); | 276 lock().AssertAcquired(); |
263 awakable_list_.Remove(awakable); | 277 awakable_list_.Remove(awakable); |
264 if (signals_state) | 278 if (signals_state) |
265 *signals_state = GetHandleSignalsStateImplNoLock(); | 279 *signals_state = GetHandleSignalsStateImplNoLock(); |
266 } | 280 } |
267 | 281 |
268 void DataPipeProducerDispatcher::StartSerializeImplNoLock( | 282 void DataPipeProducerDispatcher::StartSerializeImplNoLock( |
269 size_t* max_size, | 283 size_t* max_size, |
270 size_t* max_platform_handles) { | 284 size_t* max_platform_handles) { |
271 if (!serialized_) | 285 data_pipe_->StartSerialize(max_size, max_platform_handles); |
272 SerializeInternal(); | |
273 | |
274 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
275 !serialized_write_buffer_.empty(), max_size, | |
276 max_platform_handles); | |
277 } | 286 } |
278 | 287 |
279 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( | 288 bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock( |
280 void* destination, | 289 void* destination, |
281 size_t* actual_size, | 290 size_t* actual_size, |
282 PlatformHandleVector* platform_handles) { | 291 PlatformHandleVector* platform_handles) { |
283 ScopedPlatformHandle shared_memory_handle; | 292 data_pipe_->EndSerialize(destination, actual_size, platform_handles); |
284 size_t shared_memory_size = serialized_write_buffer_.size(); | |
285 if (shared_memory_size) { | |
286 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
287 internal::g_platform_support->CreateSharedBuffer( | |
288 shared_memory_size)); | |
289 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
290 shared_buffer->Map(0, shared_memory_size)); | |
291 memcpy(mapping->GetBase(), &serialized_write_buffer_[0], | |
292 shared_memory_size); | |
293 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
294 } | |
295 | |
296 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | |
297 std::move(shared_memory_handle), shared_memory_size, | |
298 destination, actual_size, platform_handles); | |
299 CloseImplNoLock(); | 293 CloseImplNoLock(); |
300 return true; | 294 return true; |
301 } | 295 } |
302 | 296 |
303 void DataPipeProducerDispatcher::TransportStarted() { | 297 void DataPipeProducerDispatcher::TransportStarted() { |
304 started_transport_.Acquire(); | 298 started_transport_.Acquire(); |
305 } | 299 } |
306 | 300 |
307 void DataPipeProducerDispatcher::TransportEnded() { | 301 void DataPipeProducerDispatcher::TransportEnded() { |
308 started_transport_.Release(); | 302 started_transport_.Release(); |
309 } | 303 } |
310 | 304 |
311 bool DataPipeProducerDispatcher::IsBusyNoLock() const { | 305 bool DataPipeProducerDispatcher::IsBusyNoLock() const { |
312 lock().AssertAcquired(); | 306 lock().AssertAcquired(); |
313 return InTwoPhaseWrite(); | 307 return in_two_phase_write_; |
| 308 } |
| 309 |
| 310 bool DataPipeProducerDispatcher::ProcessCommand( |
| 311 const DataPipeCommandHeader& command, |
| 312 ScopedPlatformHandleVectorPtr platform_handles) { |
| 313 // Handles write/read case and shared buffer becoming available case. |
| 314 return data_pipe_->ProcessCommand(command, std::move(platform_handles)); |
314 } | 315 } |
315 | 316 |
316 void DataPipeProducerDispatcher::OnReadMessage( | 317 void DataPipeProducerDispatcher::OnReadMessage( |
317 const MessageInTransit::View& message_view, | 318 const MessageInTransit::View& message_view, |
318 ScopedPlatformHandleVectorPtr platform_handles) { | 319 ScopedPlatformHandleVectorPtr platform_handles) { |
319 CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages."; | 320 const DataPipeCommandHeader* command = |
| 321 static_cast<const DataPipeCommandHeader*>(message_view.bytes()); |
| 322 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader)); |
| 323 |
| 324 if (started_transport_.Try()) { |
| 325 // We're not in the middle of being sent. |
| 326 |
| 327 // Can get synchronously called back from RawChannel::Init in InitOnIO if |
| 328 // there was initial data. InitOnIO locks, so don't lock twice. |
| 329 scoped_ptr<base::AutoLock> locker; |
| 330 if (!calling_init_) { |
| 331 locker.reset(new base::AutoLock(lock())); |
| 332 } |
| 333 |
| 334 if (ProcessCommand(*command, std::move(platform_handles))) { |
| 335 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| 336 } |
| 337 started_transport_.Release(); |
| 338 } else { |
| 339 // DataPipe::Serialize calls ReleaseHandle on the channel, which |
| 340 // acquires RawChannel's read_lock_. The function OnReadMessage is only |
| 341 // called while read_lock_ is acquired, and not after ReleaseHandle has been |
| 342 // called. This means this function will only be called before Serialize |
| 343 // calls ReleaseHandle, meaning the serialisation will not have started yet. |
| 344 // We only notify awakables if we're not in the process of being |
| 345 // transported. |
| 346 ProcessCommand(*command, std::move(platform_handles)); |
| 347 } |
320 } | 348 } |
321 | 349 |
322 void DataPipeProducerDispatcher::OnError(Error error) { | 350 void DataPipeProducerDispatcher::OnError(Error error) { |
323 switch (error) { | 351 switch (error) { |
324 case ERROR_READ_BROKEN: | |
325 case ERROR_READ_BAD_MESSAGE: | |
326 case ERROR_READ_UNKNOWN: | |
327 LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error."; | |
328 break; | |
329 case ERROR_READ_SHUTDOWN: | 352 case ERROR_READ_SHUTDOWN: |
330 // The other side was cleanly closed, so this isn't actually an error. | 353 // The other side was cleanly closed, so this isn't actually an error. |
331 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; | 354 DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)"; |
332 break; | 355 break; |
| 356 case ERROR_READ_BROKEN: |
| 357 LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)"; |
| 358 break; |
| 359 case ERROR_READ_BAD_MESSAGE: |
| 360 // Receiving a bad message means either a bug, data corruption, or |
| 361 // malicious attack (probably due to some other bug). |
| 362 LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad " |
| 363 << "message)"; |
| 364 break; |
| 365 case ERROR_READ_UNKNOWN: |
| 366 LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)"; |
| 367 break; |
333 case ERROR_WRITE: | 368 case ERROR_WRITE: |
334 // Write errors are slightly notable: they probably shouldn't happen under | 369 LOG(ERROR) << "DataPipeProducerDispatcher write error"; |
335 // normal operation (but maybe the other side crashed). | |
336 LOG(WARNING) << "DataPipeProducerDispatcher write error"; | |
337 break; | 370 break; |
338 } | 371 } |
339 | 372 |
340 error_ = true; | 373 peer_closed_ = true; |
341 if (started_transport_.Try()) { | 374 if (started_transport_.Try()) { |
342 base::AutoLock locker(lock()); | 375 base::AutoLock locker(lock()); |
343 // We can get two OnError callbacks before the post task below completes. | 376 // We can get two OnError callbacks before the post task below completes. |
344 // Although RawChannel still has a pointer to this object until Shutdown is | 377 // Although RawChannel still has a pointer to this object until Shutdown is |
345 // called, that is safe since this class always does a PostTask to the IO | 378 // called, that is safe since this class always does a PostTask to the IO |
346 // thread to self destruct. | 379 // thread to self destruct. |
347 if (channel_) { | 380 if (data_pipe_->channel()) { |
348 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 381 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
349 channel_->Shutdown(); | 382 data_pipe_->Shutdown(); |
350 channel_ = nullptr; | |
351 } | 383 } |
352 started_transport_.Release(); | 384 started_transport_.Release(); |
353 } else { | 385 } else { |
354 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 386 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
355 } | 387 } |
356 } | 388 } |
357 | 389 |
358 bool DataPipeProducerDispatcher::InTwoPhaseWrite() const { | |
359 return !two_phase_data_.empty(); | |
360 } | |
361 | |
362 bool DataPipeProducerDispatcher::WriteDataIntoMessages( | |
363 const void* elements, | |
364 uint32_t num_bytes) { | |
365 // The maximum amount of data to send per message (make it a multiple of the | |
366 // element size. | |
367 size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | |
368 max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes; | |
369 DCHECK_GT(max_message_num_bytes, 0u); | |
370 | |
371 uint32_t offset = 0; | |
372 while (offset < num_bytes) { | |
373 uint32_t message_num_bytes = | |
374 std::min(static_cast<uint32_t>(max_message_num_bytes), | |
375 num_bytes - offset); | |
376 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
377 MessageInTransit::Type::MESSAGE, message_num_bytes, | |
378 static_cast<const char*>(elements) + offset)); | |
379 if (!channel_->WriteMessage(std::move(message))) { | |
380 error_ = true; | |
381 return false; | |
382 } | |
383 | |
384 offset += message_num_bytes; | |
385 } | |
386 | |
387 return true; | |
388 } | |
389 | |
390 void DataPipeProducerDispatcher::SerializeInternal() { | |
391 // We need to stop watching handle immediately, even though not on IO thread, | |
392 // so that other messages aren't read after this. | |
393 if (channel_) { | |
394 std::vector<char> serialized_read_buffer; | |
395 std::vector<int> fds; | |
396 bool write_error = false; | |
397 serialized_platform_handle_ = channel_->ReleaseHandle( | |
398 &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds, | |
399 &write_error); | |
400 CHECK(serialized_read_buffer.empty()); | |
401 CHECK(fds.empty()); | |
402 if (write_error) | |
403 serialized_platform_handle_.reset(); | |
404 channel_ = nullptr; | |
405 } | |
406 serialized_ = true; | |
407 } | |
408 | |
409 } // namespace edk | 390 } // namespace edk |
410 } // namespace mojo | 391 } // namespace mojo |
OLD | NEW |