OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" | 5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <algorithm> | 10 #include <algorithm> |
11 #include <utility> | 11 #include <utility> |
12 | 12 |
13 #include "base/bind.h" | 13 #include "base/bind.h" |
14 #include "base/logging.h" | 14 #include "base/logging.h" |
15 #include "base/message_loop/message_loop.h" | 15 #include "base/message_loop/message_loop.h" |
16 #include "mojo/edk/embedder/embedder_internal.h" | 16 #include "mojo/edk/embedder/embedder_internal.h" |
17 #include "mojo/edk/embedder/platform_shared_buffer.h" | 17 #include "mojo/edk/embedder/platform_shared_buffer.h" |
18 #include "mojo/edk/embedder/platform_support.h" | 18 #include "mojo/edk/embedder/platform_support.h" |
19 #include "mojo/edk/system/data_pipe.h" | |
20 | 19 |
21 namespace mojo { | 20 namespace mojo { |
22 namespace edk { | 21 namespace edk { |
23 | 22 |
24 struct SharedMemoryHeader { | 23 void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe, |
25 uint32_t data_size; | 24 char* serialized_write_buffer, |
26 uint32_t read_buffer_size; | 25 uint32_t serialized_write_buffer_size, |
27 }; | 26 char* serialized_read_buffer, |
27 uint32_t serialized_read_buffer_size, | |
28 ScopedPlatformHandle shared_buffer_handle, | |
29 uint32_t ring_buffer_start, | |
30 uint32_t ring_buffer_size) { | |
31 if (!message_pipe.is_valid()) { | |
32 peer_closed_ = true; | |
33 } | |
28 | 34 |
29 void DataPipeConsumerDispatcher::Init( | 35 data_pipe_->Init(std::move(message_pipe), serialized_write_buffer, |
30 ScopedPlatformHandle message_pipe, | 36 serialized_write_buffer_size, serialized_read_buffer, |
31 char* serialized_read_buffer, size_t serialized_read_buffer_size) { | 37 serialized_read_buffer_size, std::move(shared_buffer_handle), |
32 if (message_pipe.is_valid()) { | 38 ring_buffer_start, ring_buffer_size, false /* is_producer */, |
33 channel_ = RawChannel::Create(std::move(message_pipe)); | 39 base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); |
34 channel_->SetSerializedData( | |
35 serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u, | |
36 nullptr, nullptr); | |
37 internal::g_io_thread_task_runner->PostTask( | |
38 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); | |
39 } else { | |
40 // The data pipe consumer could have read all the data and the producer | |
41 // closed its end subsequently (before the consumer was sent). In that case | |
42 // when we deserialize the consumer we must make sure to set error_ or | |
43 // otherwise the peer-closed signal will never be satisfied. | |
44 error_ = true; | |
45 } | |
46 } | 40 } |
47 | 41 |
48 void DataPipeConsumerDispatcher::InitOnIO() { | 42 void DataPipeConsumerDispatcher::InitOnIO() { |
49 base::AutoLock locker(lock()); | 43 base::AutoLock locker(lock()); |
50 calling_init_ = true; | 44 calling_init_ = true; |
51 if (channel_) | 45 RawChannel* channel = data_pipe_->GetChannel(); |
52 channel_->Init(this); | 46 if (channel) |
47 channel->Init(this); | |
53 calling_init_ = false; | 48 calling_init_ = false; |
54 } | 49 } |
55 | 50 |
56 void DataPipeConsumerDispatcher::CloseOnIO() { | 51 void DataPipeConsumerDispatcher::CloseOnIO() { |
57 base::AutoLock locker(lock()); | 52 base::AutoLock locker(lock()); |
58 if (channel_) { | 53 data_pipe_->Shutdown(); |
59 channel_->Shutdown(); | |
60 channel_ = nullptr; | |
61 } | |
62 } | 54 } |
63 | 55 |
64 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { | 56 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
65 return Type::DATA_PIPE_CONSUMER; | 57 return Type::DATA_PIPE_CONSUMER; |
66 } | 58 } |
67 | 59 |
68 scoped_refptr<DataPipeConsumerDispatcher> | 60 scoped_refptr<DataPipeConsumerDispatcher> |
69 DataPipeConsumerDispatcher::Deserialize( | 61 DataPipeConsumerDispatcher::Deserialize( |
70 const void* source, | 62 const void* source, |
71 size_t size, | 63 size_t size, |
72 PlatformHandleVector* platform_handles) { | 64 PlatformHandleVector* platform_handles) { |
73 MojoCreateDataPipeOptions options; | 65 MojoCreateDataPipeOptions options; |
74 ScopedPlatformHandle shared_memory_handle; | 66 ScopedPlatformHandle channel_handle, channel_shared_handle, |
Anand Mistry (off Chromium)
2016/01/11 06:19:34
It feels like most of this code should be encapsul
Eliot Courtney
2016/01/13 00:00:10
Done.
| |
75 size_t shared_memory_size = 0; | 67 shared_buffer_handle; |
68 uint32_t serialized_read_buffer_size, serialized_write_buffer_size; | |
69 uint32_t ring_buffer_start, ring_buffer_size; | |
76 | 70 |
77 ScopedPlatformHandle platform_handle = | 71 ScopedPlatformHandle platform_handle = DataPipe::Deserialize( |
78 DataPipe::Deserialize(source, size, platform_handles, &options, | 72 source, size, platform_handles, &options, &channel_shared_handle, |
79 &shared_memory_handle, &shared_memory_size); | 73 &serialized_read_buffer_size, &serialized_write_buffer_size, |
74 &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size); | |
80 | 75 |
81 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); | 76 scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); |
82 | 77 |
78 uint32_t buffer_size = | |
79 serialized_write_buffer_size + serialized_read_buffer_size; | |
83 char* serialized_read_buffer = nullptr; | 80 char* serialized_read_buffer = nullptr; |
84 size_t serialized_read_buffer_size = 0; | 81 char* serialized_write_buffer = nullptr; |
85 scoped_refptr<PlatformSharedBuffer> shared_buffer; | 82 scoped_refptr<PlatformSharedBuffer> channel_shared_buffer; |
86 scoped_ptr<PlatformSharedBufferMapping> mapping; | 83 scoped_ptr<PlatformSharedBufferMapping> mapping; |
87 if (shared_memory_size) { | 84 if (channel_shared_handle.is_valid()) { |
88 shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( | 85 channel_shared_buffer = |
89 shared_memory_size, std::move(shared_memory_handle)); | 86 internal::g_platform_support->CreateSharedBufferFromHandle( |
90 mapping = shared_buffer->Map(0, shared_memory_size); | 87 buffer_size, std::move(channel_shared_handle)); |
91 char* buffer = static_cast<char*>(mapping->GetBase()); | 88 mapping = channel_shared_buffer->Map(0, buffer_size); |
92 SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); | |
93 buffer += sizeof(SharedMemoryHeader); | |
94 if (header->data_size) { | |
95 rv->data_.assign(buffer, buffer + header->data_size); | |
96 buffer += header->data_size; | |
97 } | |
98 | 89 |
99 if (header->read_buffer_size) { | 90 serialized_read_buffer = static_cast<char*>(mapping->GetBase()); |
100 serialized_read_buffer = buffer; | 91 serialized_write_buffer = |
101 serialized_read_buffer_size = header->read_buffer_size; | 92 static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size; |
102 buffer += header->read_buffer_size; | |
103 } | |
104 } | 93 } |
105 | 94 |
106 rv->Init(std::move(platform_handle), serialized_read_buffer, | 95 rv->Init(std::move(platform_handle), serialized_read_buffer, |
107 serialized_read_buffer_size); | 96 serialized_read_buffer_size, serialized_write_buffer, |
97 serialized_write_buffer_size, std::move(shared_buffer_handle), | |
98 ring_buffer_start, ring_buffer_size); | |
108 return rv; | 99 return rv; |
109 } | 100 } |
110 | 101 |
111 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( | 102 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
112 const MojoCreateDataPipeOptions& options) | 103 const MojoCreateDataPipeOptions& options) |
113 : options_(options), | 104 : data_pipe_(new DataPipe(options)), |
114 channel_(nullptr), | |
115 calling_init_(false), | 105 calling_init_(false), |
106 peer_closed_(false), | |
116 in_two_phase_read_(false), | 107 in_two_phase_read_(false), |
117 two_phase_max_bytes_read_(0), | 108 two_phase_max_bytes_read_(0u) {} |
118 error_(false), | |
119 serialized_(false) { | |
120 } | |
121 | 109 |
122 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { | 110 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
123 // See comment in ~MessagePipeDispatcher. | 111 // See comment in ~MessagePipeDispatcher. |
124 if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) | 112 if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
125 channel_->Shutdown(); | 113 data_pipe_->Shutdown(); |
126 else | 114 else |
127 DCHECK(!channel_); | 115 DCHECK(!data_pipe_->GetChannel()); |
128 } | 116 } |
129 | 117 |
130 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { | 118 void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { |
131 lock().AssertAcquired(); | 119 lock().AssertAcquired(); |
132 awakable_list_.CancelAll(); | 120 awakable_list_.CancelAll(); |
133 } | 121 } |
134 | 122 |
135 void DataPipeConsumerDispatcher::CloseImplNoLock() { | 123 void DataPipeConsumerDispatcher::CloseImplNoLock() { |
136 lock().AssertAcquired(); | 124 lock().AssertAcquired(); |
137 internal::g_io_thread_task_runner->PostTask( | 125 internal::g_io_thread_task_runner->PostTask( |
138 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); | 126 FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::CloseOnIO, this)); |
139 } | 127 } |
140 | 128 |
141 scoped_refptr<Dispatcher> | 129 scoped_refptr<Dispatcher> |
142 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { | 130 DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
143 lock().AssertAcquired(); | 131 lock().AssertAcquired(); |
144 | 132 |
145 SerializeInternal(); | 133 scoped_refptr<DataPipeConsumerDispatcher> rv = |
134 Create(data_pipe_->GetOptions()); | |
135 data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get()); | |
146 | 136 |
147 scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); | 137 DCHECK(!in_two_phase_read_); |
148 data_.swap(rv->data_); | |
149 serialized_read_buffer_.swap(rv->serialized_read_buffer_); | |
150 rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); | |
151 rv->serialized_ = true; | |
152 | 138 |
153 return scoped_refptr<Dispatcher>(rv.get()); | 139 return scoped_refptr<Dispatcher>(rv.get()); |
154 } | 140 } |
155 | 141 |
156 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( | 142 MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
157 void* elements, | 143 void* elements, |
158 uint32_t* num_bytes, | 144 uint32_t* num_bytes, |
159 MojoReadDataFlags flags) { | 145 MojoReadDataFlags flags) { |
160 lock().AssertAcquired(); | 146 lock().AssertAcquired(); |
161 if (channel_) | |
162 channel_->EnsureLazyInitialized(); | |
163 if (in_two_phase_read_) | 147 if (in_two_phase_read_) |
164 return MOJO_RESULT_BUSY; | 148 return MOJO_RESULT_BUSY; |
165 | 149 |
166 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { | 150 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
167 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || | 151 if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
168 (flags & MOJO_READ_DATA_FLAG_DISCARD)) | 152 (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
169 return MOJO_RESULT_INVALID_ARGUMENT; | 153 return MOJO_RESULT_INVALID_ARGUMENT; |
170 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. | 154 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. |
171 DVLOG_IF(2, elements) | 155 DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|"; |
172 << "Query mode: ignoring non-null |elements|"; | 156 *num_bytes = data_pipe_->GetReadableBytes(); |
173 *num_bytes = static_cast<uint32_t>(data_.size()); | |
174 return MOJO_RESULT_OK; | 157 return MOJO_RESULT_OK; |
175 } | 158 } |
176 | 159 |
177 bool discard = false; | 160 bool discard = false; |
178 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { | 161 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { |
179 // These flags are mutally exclusive. | 162 // These flags are mutually exclusive. |
180 if (flags & MOJO_READ_DATA_FLAG_PEEK) | 163 if (flags & MOJO_READ_DATA_FLAG_PEEK) |
181 return MOJO_RESULT_INVALID_ARGUMENT; | 164 return MOJO_RESULT_INVALID_ARGUMENT; |
182 DVLOG_IF(2, elements) | 165 DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|"; |
183 << "Discard mode: ignoring non-null |elements|"; | |
184 discard = true; | 166 discard = true; |
185 } | 167 } |
186 | 168 |
187 uint32_t max_num_bytes_to_read = *num_bytes; | 169 uint32_t max_num_bytes_to_read = *num_bytes; |
188 if (max_num_bytes_to_read % options_.element_num_bytes != 0) | 170 if (max_num_bytes_to_read % data_pipe_->GetOptions().element_num_bytes != 0) |
189 return MOJO_RESULT_INVALID_ARGUMENT; | 171 return MOJO_RESULT_INVALID_ARGUMENT; |
190 | 172 |
191 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; | 173 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
192 uint32_t min_num_bytes_to_read = | 174 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
193 all_or_none ? max_num_bytes_to_read : 0; | |
194 | 175 |
195 if (min_num_bytes_to_read > data_.size()) | 176 uint32_t readable_bytes = data_pipe_->GetReadableBytes(); |
196 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; | 177 if (min_num_bytes_to_read > readable_bytes) |
178 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION | |
179 : MOJO_RESULT_OUT_OF_RANGE; | |
197 | 180 |
198 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, | 181 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, readable_bytes); |
199 static_cast<uint32_t>(data_.size())); | |
200 if (bytes_to_read == 0) | 182 if (bytes_to_read == 0) |
201 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; | 183 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
184 : MOJO_RESULT_SHOULD_WAIT; | |
202 | 185 |
203 if (!discard) | 186 // |ReadDataFromSharedBuffer| failing means we haven't got the shared buffer |
204 memcpy(elements, &data_[0], bytes_to_read); | 187 // yet, so we should wait. |
188 if (!discard && | |
189 !data_pipe_->ReadDataFromSharedBuffer(elements, bytes_to_read)) { | |
190 return MOJO_RESULT_SHOULD_WAIT; | |
191 } | |
192 | |
205 *num_bytes = bytes_to_read; | 193 *num_bytes = bytes_to_read; |
206 | 194 |
207 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); | 195 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
208 if (discard || !peek) | 196 bool should_update = !(flags & MOJO_READ_DATA_FLAG_PEEK) || discard; |
209 data_.erase(data_.begin(), data_.begin() + bytes_to_read); | 197 if (should_update) |
198 data_pipe_->UpdateFromRead(bytes_to_read); | |
199 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | |
200 if (!new_state.equals(old_state)) | |
201 awakable_list_.AwakeForStateChange(new_state); | |
202 | |
203 // Deal with state changes due to peer being closed in OnError. | |
204 if (should_update && !data_pipe_->NotifyRead(bytes_to_read)) | |
205 peer_closed_ = true; | |
210 | 206 |
211 return MOJO_RESULT_OK; | 207 return MOJO_RESULT_OK; |
212 } | 208 } |
213 | 209 |
214 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( | 210 MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
215 const void** buffer, | 211 const void** buffer, |
216 uint32_t* buffer_num_bytes, | 212 uint32_t* buffer_num_bytes, |
217 MojoReadDataFlags flags) { | 213 MojoReadDataFlags flags) { |
218 lock().AssertAcquired(); | 214 lock().AssertAcquired(); |
219 if (channel_) | |
220 channel_->EnsureLazyInitialized(); | |
221 if (in_two_phase_read_) | 215 if (in_two_phase_read_) |
222 return MOJO_RESULT_BUSY; | 216 return MOJO_RESULT_BUSY; |
223 | 217 |
224 // These flags may not be used in two-phase mode. | 218 // These flags may not be used in two-phase mode. |
225 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || | 219 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
226 (flags & MOJO_READ_DATA_FLAG_QUERY) || | 220 (flags & MOJO_READ_DATA_FLAG_QUERY) || |
227 (flags & MOJO_READ_DATA_FLAG_PEEK)) | 221 (flags & MOJO_READ_DATA_FLAG_PEEK)) |
228 return MOJO_RESULT_INVALID_ARGUMENT; | 222 return MOJO_RESULT_INVALID_ARGUMENT; |
229 | 223 |
230 uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); | 224 uint32_t readable_bytes; |
225 const void* temp_buf = data_pipe_->GetReadBuffer(&readable_bytes); | |
226 | |
227 uint32_t max_num_bytes_to_read = readable_bytes; | |
231 if (max_num_bytes_to_read == 0) | 228 if (max_num_bytes_to_read == 0) |
232 return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; | 229 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
230 : MOJO_RESULT_SHOULD_WAIT; | |
233 | 231 |
234 in_two_phase_read_ = true; | 232 in_two_phase_read_ = true; |
235 *buffer = &data_[0]; | 233 *buffer = temp_buf; |
236 *buffer_num_bytes = max_num_bytes_to_read; | 234 *buffer_num_bytes = max_num_bytes_to_read; |
237 two_phase_max_bytes_read_ = max_num_bytes_to_read; | 235 two_phase_max_bytes_read_ = max_num_bytes_to_read; |
238 | 236 |
239 return MOJO_RESULT_OK; | 237 return MOJO_RESULT_OK; |
240 } | 238 } |
241 | 239 |
242 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( | 240 MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( |
243 uint32_t num_bytes_read) { | 241 uint32_t num_bytes_read) { |
244 lock().AssertAcquired(); | 242 lock().AssertAcquired(); |
245 if (!in_two_phase_read_) | 243 if (!in_two_phase_read_) |
246 return MOJO_RESULT_FAILED_PRECONDITION; | 244 return MOJO_RESULT_FAILED_PRECONDITION; |
247 | 245 |
248 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); | 246 HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
249 MojoResult rv; | 247 in_two_phase_read_ = false; |
248 | |
250 if (num_bytes_read > two_phase_max_bytes_read_ || | 249 if (num_bytes_read > two_phase_max_bytes_read_ || |
251 num_bytes_read % options_.element_num_bytes != 0) { | 250 num_bytes_read % data_pipe_->GetOptions().element_num_bytes != 0) { |
252 rv = MOJO_RESULT_INVALID_ARGUMENT; | 251 return MOJO_RESULT_INVALID_ARGUMENT; |
253 } else { | |
254 rv = MOJO_RESULT_OK; | |
255 data_.erase(data_.begin(), data_.begin() + num_bytes_read); | |
256 } | 252 } |
257 | 253 |
258 in_two_phase_read_ = false; | 254 data_pipe_->UpdateFromRead(num_bytes_read); |
259 two_phase_max_bytes_read_ = 0; | |
260 if (!data_received_during_two_phase_read_.empty()) { | |
261 if (data_.empty()) { | |
262 data_received_during_two_phase_read_.swap(data_); | |
263 } else { | |
264 data_.insert(data_.end(), data_received_during_two_phase_read_.begin(), | |
265 data_received_during_two_phase_read_.end()); | |
266 data_received_during_two_phase_read_.clear(); | |
267 } | |
268 } | |
269 | 255 |
270 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); | 256 HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
271 if (!new_state.equals(old_state)) | 257 if (!new_state.equals(old_state)) |
272 awakable_list_.AwakeForStateChange(new_state); | 258 awakable_list_.AwakeForStateChange(new_state); |
273 | 259 |
274 return rv; | 260 // Deal with state changes due to peer being closed in OnError. |
261 if (!data_pipe_->NotifyRead(num_bytes_read)) | |
262 peer_closed_ = true; | |
263 | |
264 return MOJO_RESULT_OK; | |
275 } | 265 } |
276 | 266 |
277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() | 267 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() |
278 const { | 268 const { |
279 lock().AssertAcquired(); | 269 lock().AssertAcquired(); |
280 | 270 |
281 HandleSignalsState rv; | 271 HandleSignalsState rv; |
282 if (!data_.empty()) { | 272 if (data_pipe_->GetReadableBytes()) { |
283 if (!in_two_phase_read_) | 273 if (!in_two_phase_read_) |
284 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 274 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 275 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
286 } else if (!error_) { | 276 } |
277 | |
278 if (peer_closed_) { | |
279 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
280 } else { | |
281 // We could become readable in the future. | |
287 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 282 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
288 } | 283 } |
289 | 284 |
290 if (error_) | |
291 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
292 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 285 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
293 return rv; | 286 return rv; |
294 } | 287 } |
295 | 288 |
296 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( | 289 MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( |
297 Awakable* awakable, | 290 Awakable* awakable, |
298 MojoHandleSignals signals, | 291 MojoHandleSignals signals, |
299 uintptr_t context, | 292 uintptr_t context, |
300 HandleSignalsState* signals_state) { | 293 HandleSignalsState* signals_state) { |
301 lock().AssertAcquired(); | 294 lock().AssertAcquired(); |
302 if (channel_) | |
303 channel_->EnsureLazyInitialized(); | |
304 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); | 295 HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
305 if (state.satisfies(signals)) { | 296 if (state.satisfies(signals)) { |
306 if (signals_state) | 297 if (signals_state) |
307 *signals_state = state; | 298 *signals_state = state; |
308 return MOJO_RESULT_ALREADY_EXISTS; | 299 return MOJO_RESULT_ALREADY_EXISTS; |
309 } | 300 } |
310 if (!state.can_satisfy(signals)) { | 301 if (!state.can_satisfy(signals)) { |
311 if (signals_state) | 302 if (signals_state) |
312 *signals_state = state; | 303 *signals_state = state; |
313 return MOJO_RESULT_FAILED_PRECONDITION; | 304 return MOJO_RESULT_FAILED_PRECONDITION; |
314 } | 305 } |
315 | 306 |
316 awakable_list_.Add(awakable, signals, context); | 307 awakable_list_.Add(awakable, signals, context); |
317 return MOJO_RESULT_OK; | 308 return MOJO_RESULT_OK; |
318 } | 309 } |
319 | 310 |
320 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( | 311 void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( |
321 Awakable* awakable, | 312 Awakable* awakable, |
322 HandleSignalsState* signals_state) { | 313 HandleSignalsState* signals_state) { |
323 lock().AssertAcquired(); | 314 lock().AssertAcquired(); |
324 awakable_list_.Remove(awakable); | 315 awakable_list_.Remove(awakable); |
325 if (signals_state) | 316 if (signals_state) |
326 *signals_state = GetHandleSignalsStateImplNoLock(); | 317 *signals_state = GetHandleSignalsStateImplNoLock(); |
327 } | 318 } |
328 | 319 |
329 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( | 320 void DataPipeConsumerDispatcher::StartSerializeImplNoLock( |
330 size_t* max_size, | 321 size_t* max_size, |
331 size_t* max_platform_handles) { | 322 size_t* max_platform_handles) { |
332 if (!serialized_) { | 323 data_pipe_->StartSerialize(max_size, max_platform_handles); |
333 // Handles the case where we have messages read off RawChannel but not ready | |
334 // by MojoReadMessage. | |
335 SerializeInternal(); | |
336 } | |
337 | |
338 DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), | |
339 !data_.empty() || !serialized_read_buffer_.empty(), | |
340 max_size, max_platform_handles); | |
341 } | 324 } |
342 | 325 |
343 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( | 326 bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( |
344 void* destination, | 327 void* destination, |
345 size_t* actual_size, | 328 size_t* actual_size, |
346 PlatformHandleVector* platform_handles) { | 329 PlatformHandleVector* platform_handles) { |
347 ScopedPlatformHandle shared_memory_handle; | 330 data_pipe_->EndSerialize(destination, actual_size, platform_handles); |
348 size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); | |
349 if (shared_memory_size) { | |
350 shared_memory_size += sizeof(SharedMemoryHeader); | |
351 SharedMemoryHeader header; | |
352 header.data_size = static_cast<uint32_t>(data_.size()); | |
353 header.read_buffer_size = | |
354 static_cast<uint32_t>(serialized_read_buffer_.size()); | |
355 | |
356 scoped_refptr<PlatformSharedBuffer> shared_buffer( | |
357 internal::g_platform_support->CreateSharedBuffer( | |
358 shared_memory_size)); | |
359 scoped_ptr<PlatformSharedBufferMapping> mapping( | |
360 shared_buffer->Map(0, shared_memory_size)); | |
361 | |
362 char* start = static_cast<char*>(mapping->GetBase()); | |
363 memcpy(start, &header, sizeof(SharedMemoryHeader)); | |
364 start += sizeof(SharedMemoryHeader); | |
365 | |
366 if (!data_.empty()) { | |
367 memcpy(start, &data_[0], data_.size()); | |
368 start += data_.size(); | |
369 } | |
370 | |
371 if (!serialized_read_buffer_.empty()) { | |
372 memcpy(start, &serialized_read_buffer_[0], | |
373 serialized_read_buffer_.size()); | |
374 start += serialized_read_buffer_.size(); | |
375 } | |
376 | |
377 shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); | |
378 } | |
379 | |
380 DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), | |
381 std::move(shared_memory_handle), shared_memory_size, | |
382 destination, actual_size, platform_handles); | |
383 CloseImplNoLock(); | 331 CloseImplNoLock(); |
384 return true; | 332 return true; |
385 } | 333 } |
386 | 334 |
387 void DataPipeConsumerDispatcher::TransportStarted() { | 335 void DataPipeConsumerDispatcher::TransportStarted() { |
388 started_transport_.Acquire(); | 336 started_transport_.Acquire(); |
389 } | 337 } |
390 | 338 |
391 void DataPipeConsumerDispatcher::TransportEnded() { | 339 void DataPipeConsumerDispatcher::TransportEnded() { |
392 started_transport_.Release(); | 340 started_transport_.Release(); |
393 | 341 |
394 base::AutoLock locker(lock()); | 342 base::AutoLock locker(lock()); |
395 | 343 |
396 // If transporting of DP failed, we might have got more data and didn't awake | 344 // If transporting of DP failed, we might have got more data and didn't awake |
397 // for. | 345 // for. |
398 // TODO(jam): should we care about only alerting if it was empty before | 346 // TODO(jam): should we care about only alerting if it was empty before |
399 // TransportStarted? | 347 // TransportStarted? |
400 if (!data_.empty()) | 348 if (data_pipe_->GetReadableBytes()) |
401 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 349 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
402 } | 350 } |
403 | 351 |
404 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { | 352 bool DataPipeConsumerDispatcher::IsBusyNoLock() const { |
405 lock().AssertAcquired(); | 353 lock().AssertAcquired(); |
406 return in_two_phase_read_; | 354 return in_two_phase_read_; |
407 } | 355 } |
408 | 356 |
357 bool DataPipeConsumerDispatcher::ProcessCommand( | |
358 const DataPipeCommandHeader& command, | |
359 ScopedPlatformHandleVectorPtr platform_handles) { | |
360 // Handles write/read case and shared buffer becoming available case. | |
361 return data_pipe_->ProcessCommand(command, std::move(platform_handles)); | |
362 } | |
363 | |
409 void DataPipeConsumerDispatcher::OnReadMessage( | 364 void DataPipeConsumerDispatcher::OnReadMessage( |
410 const MessageInTransit::View& message_view, | 365 const MessageInTransit::View& message_view, |
411 ScopedPlatformHandleVectorPtr platform_handles) { | 366 ScopedPlatformHandleVectorPtr platform_handles) { |
412 const char* bytes_start = static_cast<const char*>(message_view.bytes()); | 367 const DataPipeCommandHeader* command = |
413 const char* bytes_end = bytes_start + message_view.num_bytes(); | 368 static_cast<const DataPipeCommandHeader*>(message_view.bytes()); |
369 DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader)); | |
370 | |
414 if (started_transport_.Try()) { | 371 if (started_transport_.Try()) { |
415 // We're not in the middle of being sent. | 372 // We're not in the middle of being sent. |
416 | 373 |
417 // Can get synchronously called back in Init if there was initial data. | 374 // Can get synchronously called back from RawChannel::Init in InitOnIO if |
375 // there was initial data. InitOnIO locks, so don't lock twice. | |
418 scoped_ptr<base::AutoLock> locker; | 376 scoped_ptr<base::AutoLock> locker; |
419 if (!calling_init_) { | 377 if (!calling_init_) { |
420 locker.reset(new base::AutoLock(lock())); | 378 locker.reset(new base::AutoLock(lock())); |
421 } | 379 } |
422 | 380 |
423 if (in_two_phase_read_) { | 381 if (ProcessCommand(*command, std::move(platform_handles))) { |
424 data_received_during_two_phase_read_.insert( | 382 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
425 data_received_during_two_phase_read_.end(), bytes_start, bytes_end); | |
426 } else { | |
427 bool was_empty = data_.empty(); | |
428 data_.insert(data_.end(), bytes_start, bytes_end); | |
429 if (was_empty) | |
430 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | |
431 } | 383 } |
432 started_transport_.Release(); | 384 started_transport_.Release(); |
433 } else { | 385 } else { |
434 // See comment in MessagePipeDispatcher about why we can't and don't need | 386 // DataPipe::Serialize calls ReleaseHandle on the channel, which |
435 // to lock here. | 387 // acquires RawChannel's read_lock_. The function OnReadMessage is only |
436 data_.insert(data_.end(), bytes_start, bytes_end); | 388 // called while read_lock_ is acquired, and not after ReleaseHandle has been |
389 // called. This means this function will only be called before Serialize | |
390 // calls ReleaseHandle, meaning the serialisation will not have started yet. | |
391 // We only notify awakables if we're not in the process of being | |
392 // transported. | |
393 ProcessCommand(*command, std::move(platform_handles)); | |
437 } | 394 } |
438 } | 395 } |
439 | 396 |
440 void DataPipeConsumerDispatcher::OnError(Error error) { | 397 void DataPipeConsumerDispatcher::OnError(Error error) { |
441 switch (error) { | 398 switch (error) { |
442 case ERROR_READ_SHUTDOWN: | 399 case ERROR_READ_SHUTDOWN: |
443 // The other side was cleanly closed, so this isn't actually an error. | 400 // The other side was cleanly closed, so this isn't actually an error. |
444 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; | 401 DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; |
445 break; | 402 break; |
446 case ERROR_READ_BROKEN: | 403 case ERROR_READ_BROKEN: |
447 LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; | 404 // It's okay for the other side to close the connection without reading |
405 // our updates about how much we've read. | |
406 DLOG(ERROR) | |
407 << "DataPipeConsumerDispatcher read error (connection broken)"; | |
448 break; | 408 break; |
449 case ERROR_READ_BAD_MESSAGE: | 409 case ERROR_READ_BAD_MESSAGE: |
450 // Receiving a bad message means either a bug, data corruption, or | 410 // Receiving a bad message means either a bug, data corruption, or |
451 // malicious attack (probably due to some other bug). | 411 // malicious attack (probably due to some other bug). |
452 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " | 412 LOG(ERROR) << "DataPipeConsumerDispatcher read error (received bad " |
453 << "message)"; | 413 << "message)"; |
454 break; | 414 break; |
455 case ERROR_READ_UNKNOWN: | 415 case ERROR_READ_UNKNOWN: |
456 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; | 416 LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; |
457 break; | 417 break; |
458 case ERROR_WRITE: | 418 case ERROR_WRITE: |
459 LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; | 419 LOG(ERROR) << "DataPipeConsumerDispatcher write error"; |
460 break; | 420 break; |
461 } | 421 } |
462 | 422 |
463 error_ = true; | 423 peer_closed_ = true; |
464 if (started_transport_.Try()) { | 424 if (started_transport_.Try()) { |
465 base::AutoLock locker(lock()); | 425 base::AutoLock locker(lock()); |
466 // We can get two OnError callbacks before the post task below completes. | 426 // We can get two OnError callbacks before the post task below completes. |
467 // Although RawChannel still has a pointer to this object until Shutdown is | 427 // Although RawChannel still has a pointer to this object until Shutdown is |
468 // called, that is safe since this class always does a PostTask to the IO | 428 // called, that is safe since this class always does a PostTask to the IO |
469 // thread to self destruct. | 429 // thread to self destruct. |
470 if (channel_) { | 430 if (data_pipe_->GetChannel()) { |
471 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); | 431 awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
472 channel_->Shutdown(); | 432 data_pipe_->Shutdown(); |
473 channel_ = nullptr; | |
474 } | 433 } |
475 started_transport_.Release(); | 434 started_transport_.Release(); |
476 } else { | 435 } else { |
477 // We must be waiting to call ReleaseHandle. It will call Shutdown. | 436 // We must be waiting to call ReleaseHandle. It will call Shutdown. |
478 } | 437 } |
479 } | 438 } |
480 | 439 |
481 void DataPipeConsumerDispatcher::SerializeInternal() { | |
482 DCHECK(!in_two_phase_read_); | |
483 // We need to stop watching handle immediately, even though not on IO thread, | |
484 // so that other messages aren't read after this. | |
485 if (channel_) { | |
486 std::vector<char> serialized_write_buffer; | |
487 std::vector<int> fds; | |
488 bool write_error = false; | |
489 serialized_platform_handle_ = channel_->ReleaseHandle( | |
490 &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds, | |
491 &write_error); | |
492 CHECK(serialized_write_buffer.empty()); | |
493 CHECK(fds.empty()); | |
494 CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write."; | |
495 | |
496 channel_ = nullptr; | |
497 } | |
498 | |
499 serialized_ = true; | |
500 } | |
501 | |
502 } // namespace edk | 440 } // namespace edk |
503 } // namespace mojo | 441 } // namespace mojo |
OLD | NEW |