OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
12 #include "base/location.h" | 12 #include "base/location.h" |
13 #include "base/logging.h" | 13 #include "base/logging.h" |
14 #include "base/message_loop/message_loop.h" | 14 #include "base/message_loop/message_loop.h" |
15 #include "mojo/edk/embedder/embedder_internal.h" | |
15 #include "mojo/edk/system/message_in_transit.h" | 16 #include "mojo/edk/system/message_in_transit.h" |
16 #include "mojo/edk/system/transport_data.h" | 17 #include "mojo/edk/system/transport_data.h" |
17 | 18 |
18 namespace mojo { | 19 namespace mojo { |
19 namespace system { | 20 namespace system { |
20 | 21 |
21 const size_t kReadSize = 4096; | 22 const size_t kReadSize = 4096; |
22 | 23 |
23 // RawChannel::ReadBuffer ------------------------------------------------------ | 24 // RawChannel::ReadBuffer ------------------------------------------------------ |
24 | 25 |
25 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { | 26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { |
26 } | 27 } |
27 | 28 |
28 RawChannel::ReadBuffer::~ReadBuffer() { | 29 RawChannel::ReadBuffer::~ReadBuffer() { |
29 } | 30 } |
30 | 31 |
31 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { | 32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { |
32 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); | 33 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); |
33 *addr = &buffer_[0] + num_valid_bytes_; | 34 *addr = &buffer_[0] + num_valid_bytes_; |
34 *size = kReadSize; | 35 *size = kReadSize; |
35 } | 36 } |
36 | 37 |
37 // RawChannel::WriteBuffer ----------------------------------------------------- | 38 // RawChannel::WriteBuffer ----------------------------------------------------- |
38 | 39 |
39 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size) | 40 RawChannel::WriteBuffer::WriteBuffer() |
40 : serialized_platform_handle_size_(serialized_platform_handle_size), | 41 : serialized_platform_handle_size_(0), |
41 platform_handles_offset_(0), | 42 platform_handles_offset_(0), |
42 data_offset_(0) { | 43 data_offset_(0) { |
43 } | 44 } |
44 | 45 |
45 RawChannel::WriteBuffer::~WriteBuffer() { | 46 RawChannel::WriteBuffer::~WriteBuffer() { |
46 message_queue_.Clear(); | 47 message_queue_.Clear(); |
47 } | 48 } |
48 | 49 |
49 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { | 50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const { |
50 if (message_queue_.IsEmpty()) | 51 if (message_queue_.IsEmpty()) |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
108 size_t transport_data_buffer_size = | 109 size_t transport_data_buffer_size = |
109 message->transport_data() ? message->transport_data()->buffer_size() : 0; | 110 message->transport_data() ? message->transport_data()->buffer_size() : 0; |
110 | 111 |
111 if (!transport_data_buffer_size) { | 112 if (!transport_data_buffer_size) { |
112 // Only write from the main buffer. | 113 // Only write from the main buffer. |
113 DCHECK_LT(data_offset_, message->main_buffer_size()); | 114 DCHECK_LT(data_offset_, message->main_buffer_size()); |
114 DCHECK_LE(bytes_to_write, message->main_buffer_size()); | 115 DCHECK_LE(bytes_to_write, message->main_buffer_size()); |
115 Buffer buffer = { | 116 Buffer buffer = { |
116 static_cast<const char*>(message->main_buffer()) + data_offset_, | 117 static_cast<const char*>(message->main_buffer()) + data_offset_, |
117 bytes_to_write}; | 118 bytes_to_write}; |
119 | |
118 buffers->push_back(buffer); | 120 buffers->push_back(buffer); |
119 return; | 121 return; |
120 } | 122 } |
121 | 123 |
122 if (data_offset_ >= message->main_buffer_size()) { | 124 if (data_offset_ >= message->main_buffer_size()) { |
123 // Only write from the transport data buffer. | 125 // Only write from the transport data buffer. |
124 DCHECK_LT(data_offset_ - message->main_buffer_size(), | 126 DCHECK_LT(data_offset_ - message->main_buffer_size(), |
125 transport_data_buffer_size); | 127 transport_data_buffer_size); |
126 DCHECK_LE(bytes_to_write, transport_data_buffer_size); | 128 DCHECK_LE(bytes_to_write, transport_data_buffer_size); |
127 Buffer buffer = { | 129 Buffer buffer = { |
128 static_cast<const char*>(message->transport_data()->buffer()) + | 130 static_cast<const char*>(message->transport_data()->buffer()) + |
129 (data_offset_ - message->main_buffer_size()), | 131 (data_offset_ - message->main_buffer_size()), |
130 bytes_to_write}; | 132 bytes_to_write}; |
133 | |
131 buffers->push_back(buffer); | 134 buffers->push_back(buffer); |
132 return; | 135 return; |
133 } | 136 } |
134 | 137 |
135 // TODO(vtl): We could actually send out buffers from multiple messages, with | 138 // TODO(vtl): We could actually send out buffers from multiple messages, with |
136 // the "stopping" condition being reaching a message with platform handles | 139 // the "stopping" condition being reaching a message with platform handles |
137 // attached. | 140 // attached. |
138 | 141 |
139 // Write from both buffers. | 142 // Write from both buffers. |
140 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + | 143 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + |
141 transport_data_buffer_size); | 144 transport_data_buffer_size); |
142 Buffer buffer1 = { | 145 Buffer buffer1 = { |
143 static_cast<const char*>(message->main_buffer()) + data_offset_, | 146 static_cast<const char*>(message->main_buffer()) + data_offset_, |
144 message->main_buffer_size() - data_offset_}; | 147 message->main_buffer_size() - data_offset_}; |
145 buffers->push_back(buffer1); | 148 buffers->push_back(buffer1); |
146 Buffer buffer2 = { | 149 Buffer buffer2 = { |
147 static_cast<const char*>(message->transport_data()->buffer()), | 150 static_cast<const char*>(message->transport_data()->buffer()), |
148 transport_data_buffer_size}; | 151 transport_data_buffer_size}; |
149 buffers->push_back(buffer2); | 152 buffers->push_back(buffer2); |
150 } | 153 } |
151 | 154 |
152 // RawChannel ------------------------------------------------------------------ | 155 // RawChannel ------------------------------------------------------------------ |
153 | 156 |
154 RawChannel::RawChannel() | 157 RawChannel::RawChannel() |
155 : message_loop_for_io_(nullptr), | 158 : message_loop_for_io_(nullptr), |
159 set_on_shutdown_(nullptr), | |
156 delegate_(nullptr), | 160 delegate_(nullptr), |
157 set_on_shutdown_(nullptr), | 161 write_ready_(false), |
158 write_stopped_(false), | 162 write_stopped_(false), |
163 error_occurred_(false), | |
159 weak_ptr_factory_(this) { | 164 weak_ptr_factory_(this) { |
165 read_buffer_.reset(new ReadBuffer); | |
166 write_buffer_.reset(new WriteBuffer()); | |
160 } | 167 } |
161 | 168 |
162 RawChannel::~RawChannel() { | 169 RawChannel::~RawChannel() { |
163 DCHECK(!read_buffer_); | 170 DCHECK(!read_buffer_); |
164 DCHECK(!write_buffer_); | 171 DCHECK(!write_buffer_); |
165 | 172 |
166 // No need to take the |write_lock_| here -- if there are still weak pointers | 173 // Only want to decrement counter if Init was called. |
167 // outstanding, then we're hosed anyway (since we wouldn't be able to | 174 if (message_loop_for_io_) { |
168 // invalidate them cleanly, since we might not be on the I/O thread). | 175 // No need to take the |write_lock_| here -- if there are still weak |
169 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | 176 // pointers outstanding, then we're hosed anyway (since we wouldn't be able |
177 // to invalidate them cleanly, since we might not be on the I/O thread). | |
178 // DCHECK(!weak_ptr_factory_.HasWeakPtrs()); | |
179 embedder::internal::ChannelShutdown(); | |
180 } | |
170 } | 181 } |
171 | 182 |
172 void RawChannel::Init(Delegate* delegate) { | 183 void RawChannel::Init(Delegate* delegate) { |
184 embedder::internal::ChannelStarted(); | |
173 DCHECK(delegate); | 185 DCHECK(delegate); |
174 | 186 |
187 base::AutoLock read_locker(read_lock_); | |
188 // solves race where initialiing on io thread while main thread is serializing | |
189 // this channel and releases handle. | |
190 base::AutoLock locker(write_lock_); | |
191 | |
175 DCHECK(!delegate_); | 192 DCHECK(!delegate_); |
176 delegate_ = delegate; | 193 delegate_ = delegate; |
177 | 194 |
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); | 195 //CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); |
179 DCHECK(!message_loop_for_io_); | 196 DCHECK(!message_loop_for_io_); |
180 message_loop_for_io_ = | 197 message_loop_for_io_ = |
181 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); | 198 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); |
182 | 199 |
183 // No need to take the lock. No one should be using us yet. | 200 OnInit(); |
184 DCHECK(!read_buffer_); | |
185 read_buffer_.reset(new ReadBuffer); | |
186 DCHECK(!write_buffer_); | |
187 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize())); | |
188 | 201 |
189 OnInit(); | 202 // Although this means that we can call back sync into the caller, that's |
203 // easier than posting a task to do this, because there might also be pending | |
204 // read calls and we can't modify the buffer. | |
205 if (read_buffer_->num_valid_bytes()) { | |
206 // We had serialized read buffer data through SetInitialReadBufferData call. | |
207 // Make sure we read messages out of it now, otherwise the delegate won't | |
208 // get notified if no other data gets written to the pipe. | |
209 bool did_dispatch_message = false; | |
210 bool stop_dispatching = false; | |
211 DispatchMessages(&did_dispatch_message, &stop_dispatching); | |
212 } | |
190 | 213 |
191 IOResult io_result = ScheduleRead(); | 214 IOResult io_result = ScheduleRead(); |
192 if (io_result != IO_PENDING) { | 215 if (io_result != IO_PENDING) { |
193 // This will notify the delegate about the read failure. Although we're on | 216 // This will notify the delegate about the read failure. Although we're on |
194 // the I/O thread, don't call it in the nested context. | 217 // the I/O thread, don't call it in the nested context. |
195 message_loop_for_io_->PostTask( | 218 message_loop_for_io_->PostTask( |
196 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, | 219 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, |
197 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | 220 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); |
198 } | 221 } |
199 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying | 222 // Note: |ScheduleRead()| failure is treated as a read failure (by notifying |
200 // the delegate), not an initialization failure. | 223 // the delegate), not an initialization failure. |
224 | |
225 write_ready_ = true; | |
226 write_buffer_->serialized_platform_handle_size_ = | |
227 GetSerializedPlatformHandleSize(); | |
228 if (!write_buffer_->message_queue_.IsEmpty()) | |
229 SendQueuedMessagesNoLock(); | |
201 } | 230 } |
202 | 231 |
203 void RawChannel::Shutdown() { | 232 void RawChannel::Shutdown() { |
204 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 233 weak_ptr_factory_.InvalidateWeakPtrs(); |
205 | 234 |
235 // Normally, we want to flush any pending writes before shutting down. This | |
236 // doesn't apply when 1) we don't have a handle (for obvious reasons) or | |
237 // 2) when the other side already quit and asked us to close the handle to | |
238 // ensure that we read everything out of the pipe first. | |
239 if (!HandleForDebuggingNoLock().is_valid() || error_occurred_) { | |
240 { | |
241 base::AutoLock read_locker(read_lock_); | |
242 base::AutoLock locker(write_lock_); | |
243 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | |
244 } | |
245 delete this; | |
246 return; | |
247 } | |
248 | |
249 base::AutoLock read_locker(read_lock_); | |
206 base::AutoLock locker(write_lock_); | 250 base::AutoLock locker(write_lock_); |
251 DCHECK(read_buffer_->num_valid_bytes() == 0) << | |
252 "RawChannel::Shutdown called but there is pending data to be read"; | |
207 | 253 |
208 LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty()) | 254 // happens on shutdown if didn't call init when doing createduplicate |
209 << "Shutting down RawChannel with write buffer nonempty"; | 255 if (message_loop_for_io()) { |
256 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | |
257 } | |
210 | 258 |
211 // Reset the delegate so that it won't receive further calls. | 259 // Reset the delegate so that it won't receive further calls. |
212 delegate_ = nullptr; | 260 delegate_ = nullptr; |
213 if (set_on_shutdown_) { | 261 if (set_on_shutdown_) { |
214 *set_on_shutdown_ = true; | 262 *set_on_shutdown_ = true; |
215 set_on_shutdown_ = nullptr; | 263 set_on_shutdown_ = nullptr; |
216 } | 264 } |
217 write_stopped_ = true; | |
218 weak_ptr_factory_.InvalidateWeakPtrs(); | |
219 | 265 |
220 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); | 266 // TODO(jam): probably remove this since it doesn't make sense now that we |
267 // wait and flush pending messages. | |
268 // write_stopped_ = true; | |
269 | |
270 | |
271 bool empty = write_buffer_->message_queue_.IsEmpty(); | |
272 | |
273 // We may have no messages to write. However just because our end of the pipe | |
274 // wrote everything doesn't mean that the other end read it. We don't want to | |
275 // call FlushFileBuffers since a) that only works for server end of the pipe, | |
276 // and b) it pauses this thread (which can block a process on another, or | |
277 // worse hang if both pipes are in the same process). | |
278 scoped_ptr<MessageInTransit> quit_message(new MessageInTransit( | |
279 MessageInTransit::Type::RAW_CHANNEL_QUIT, 0, nullptr)); | |
280 EnqueueMessageNoLock(quit_message.Pass()); | |
281 | |
282 if (empty) | |
283 SendQueuedMessagesNoLock(); | |
284 } | |
285 | |
286 embedder::ScopedPlatformHandle RawChannel::ReleaseHandle( | |
287 std::vector<char>* read_buffer) { | |
288 //LOG(ERROR) << "RawChannel::ReleaseHandle( " << this; | |
289 | |
290 embedder::ScopedPlatformHandle rv; | |
291 { | |
292 base::AutoLock read_locker(read_lock_); | |
293 base::AutoLock locker(write_lock_); | |
294 rv = ReleaseHandleNoLock(read_buffer); | |
295 | |
296 // TODO(jam); if we use these, use nolock versions of these methods that are | |
297 // copied. | |
298 if (write_buffer_.get() && !write_buffer_->message_queue_.IsEmpty()) { | |
299 NOTREACHED() << "TODO(JAM)"; | |
300 } | |
301 | |
302 delegate_ = nullptr; | |
303 | |
304 // The Unretained is safe because above cancelled IO so we shouldn't get any | |
305 // channel errors. | |
306 // |message_loop_for_io_| might not be set yet | |
307 embedder::internal::g_io_thread_task_runner->PostTask( | |
308 FROM_HERE, | |
309 base::Bind(&RawChannel::Shutdown, base::Unretained(this))); | |
310 } | |
311 | |
312 return rv; | |
221 } | 313 } |
222 | 314 |
223 // Reminder: This must be thread-safe. | 315 // Reminder: This must be thread-safe. |
224 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { | 316 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
225 DCHECK(message); | 317 DCHECK(message); |
226 | |
227 base::AutoLock locker(write_lock_); | 318 base::AutoLock locker(write_lock_); |
228 if (write_stopped_) | 319 if (write_stopped_) |
229 return false; | 320 return false; |
230 | 321 |
231 if (!write_buffer_->message_queue_.IsEmpty()) { | 322 bool queue_was_empty = write_buffer_->message_queue_.IsEmpty(); |
232 EnqueueMessageNoLock(message.Pass()); | 323 EnqueueMessageNoLock(message.Pass()); |
233 return true; | 324 if (queue_was_empty && write_ready_) |
234 } | 325 SendQueuedMessagesNoLock(); |
235 | 326 |
236 EnqueueMessageNoLock(message.Pass()); | 327 return true; |
328 } | |
329 | |
330 void RawChannel::SendQueuedMessagesNoLock() { | |
237 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 331 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
238 | 332 |
239 size_t platform_handles_written = 0; | 333 size_t platform_handles_written = 0; |
240 size_t bytes_written = 0; | 334 size_t bytes_written = 0; |
241 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 335 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
242 if (io_result == IO_PENDING) | 336 if (io_result == IO_PENDING) |
243 return true; | 337 return; |
244 | 338 |
245 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, | 339 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, |
246 bytes_written); | 340 bytes_written); |
247 if (!result) { | 341 if (!result) { |
248 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 342 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
249 // context. | 343 // context. |
250 message_loop_for_io_->PostTask( | 344 message_loop_for_io_->PostTask( |
251 FROM_HERE, | 345 FROM_HERE, |
252 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), | 346 base::Bind(&RawChannel::LockAndCallOnError, |
347 weak_ptr_factory_.GetWeakPtr(), | |
253 Delegate::ERROR_WRITE)); | 348 Delegate::ERROR_WRITE)); |
254 } | 349 } |
255 | |
256 return result; | |
257 } | 350 } |
258 | 351 |
259 // Reminder: This must be thread-safe. | 352 // Reminder: This must be thread-safe. |
260 bool RawChannel::IsWriteBufferEmpty() { | 353 bool RawChannel::IsWriteBufferEmpty() { |
261 base::AutoLock locker(write_lock_); | 354 base::AutoLock locker(write_lock_); |
262 return write_buffer_->message_queue_.IsEmpty(); | 355 return write_buffer_->message_queue_.IsEmpty(); |
263 } | 356 } |
264 | 357 |
358 bool RawChannel::IsReadBufferEmpty() { | |
359 base::AutoLock locker(read_lock_); | |
360 return read_buffer_->num_valid_bytes_ != 0; | |
361 } | |
362 | |
363 void RawChannel::SetInitialReadBufferData(char* data, size_t size) { | |
364 base::AutoLock locker(read_lock_); | |
365 // TODO(jam): copy power of 2 algorithm below? or share. | |
366 read_buffer_->buffer_.resize(size+kReadSize); | |
367 memcpy(&read_buffer_->buffer_[0], data, size); | |
368 read_buffer_->num_valid_bytes_ = size; | |
369 } | |
370 | |
265 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { | 371 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) { |
266 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 372 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
267 | 373 |
374 base::AutoLock locker(read_lock_); | |
375 | |
268 // Keep reading data in a loop, and dispatch messages if enough data is | 376 // Keep reading data in a loop, and dispatch messages if enough data is |
269 // received. Exit the loop if any of the following happens: | 377 // received. Exit the loop if any of the following happens: |
270 // - one or more messages were dispatched; | 378 // - one or more messages were dispatched; |
271 // - the last read failed, was a partial read or would block; | 379 // - the last read failed, was a partial read or would block; |
272 // - |Shutdown()| was called. | 380 // - |Shutdown()| was called. |
273 do { | 381 do { |
274 switch (io_result) { | 382 switch (io_result) { |
275 case IO_SUCCEEDED: | 383 case IO_SUCCEEDED: |
276 break; | 384 break; |
277 case IO_FAILED_SHUTDOWN: | 385 case IO_FAILED_SHUTDOWN: |
278 case IO_FAILED_BROKEN: | 386 case IO_FAILED_BROKEN: |
279 case IO_FAILED_UNKNOWN: | 387 case IO_FAILED_UNKNOWN: |
280 CallOnError(ReadIOResultToError(io_result)); | 388 CallOnError(ReadIOResultToError(io_result)); |
281 return; // |this| may have been destroyed in |CallOnError()|. | 389 return; // |this| may have been destroyed in |CallOnError()|. |
282 case IO_PENDING: | 390 case IO_PENDING: |
283 NOTREACHED(); | 391 NOTREACHED(); |
284 return; | 392 return; |
285 } | 393 } |
286 | 394 |
287 read_buffer_->num_valid_bytes_ += bytes_read; | 395 read_buffer_->num_valid_bytes_ += bytes_read; |
288 | 396 |
289 // Dispatch all the messages that we can. | 397 // Dispatch all the messages that we can. |
290 bool did_dispatch_message = false; | 398 bool did_dispatch_message = false; |
291 // Tracks the offset of the first undispatched message in |read_buffer_|. | 399 bool stop_dispatching = false; |
292 // Currently, we copy data to ensure that this is zero at the beginning. | 400 DispatchMessages(&did_dispatch_message, &stop_dispatching); |
293 size_t read_buffer_start = 0; | 401 if (stop_dispatching) |
294 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | 402 return; |
295 size_t message_size; | |
296 // Note that we rely on short-circuit evaluation here: | |
297 // - |read_buffer_start| may be an invalid index into | |
298 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
299 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
300 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
301 // next read). | |
302 // TODO(vtl): Validate that |message_size| is sane. | |
303 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
304 &read_buffer_->buffer_[read_buffer_start], | |
305 remaining_bytes, &message_size) && | |
306 remaining_bytes >= message_size) { | |
307 MessageInTransit::View message_view( | |
308 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
309 DCHECK_EQ(message_view.total_size(), message_size); | |
310 | |
311 const char* error_message = nullptr; | |
312 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
313 &error_message)) { | |
314 DCHECK(error_message); | |
315 LOG(ERROR) << "Received invalid message: " << error_message; | |
316 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
317 return; // |this| may have been destroyed in |CallOnError()|. | |
318 } | |
319 | |
320 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL) { | |
321 if (!OnReadMessageForRawChannel(message_view)) { | |
322 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
323 return; // |this| may have been destroyed in |CallOnError()|. | |
324 } | |
325 } else { | |
326 embedder::ScopedPlatformHandleVectorPtr platform_handles; | |
327 if (message_view.transport_data_buffer()) { | |
328 size_t num_platform_handles; | |
329 const void* platform_handle_table; | |
330 TransportData::GetPlatformHandleTable( | |
331 message_view.transport_data_buffer(), &num_platform_handles, | |
332 &platform_handle_table); | |
333 | |
334 if (num_platform_handles > 0) { | |
335 platform_handles = | |
336 GetReadPlatformHandles(num_platform_handles, | |
337 platform_handle_table).Pass(); | |
338 if (!platform_handles) { | |
339 LOG(ERROR) << "Invalid number of platform handles received"; | |
340 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
341 return; // |this| may have been destroyed in |CallOnError()|. | |
342 } | |
343 } | |
344 } | |
345 | |
346 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
347 // for the POSIX implementation, we should confirm that none are stored. | |
348 | |
349 // Dispatch the message. | |
350 // Detect the case when |Shutdown()| is called; subsequent destruction | |
351 // is also permitted then. | |
352 bool shutdown_called = false; | |
353 DCHECK(!set_on_shutdown_); | |
354 set_on_shutdown_ = &shutdown_called; | |
355 DCHECK(delegate_); | |
356 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | |
357 if (shutdown_called) | |
358 return; | |
359 set_on_shutdown_ = nullptr; | |
360 } | |
361 | |
362 did_dispatch_message = true; | |
363 | |
364 // Update our state. | |
365 read_buffer_start += message_size; | |
366 remaining_bytes -= message_size; | |
367 } | |
368 | |
369 if (read_buffer_start > 0) { | |
370 // Move data back to start. | |
371 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
372 if (read_buffer_->num_valid_bytes_ > 0) { | |
373 memmove(&read_buffer_->buffer_[0], | |
374 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
375 } | |
376 read_buffer_start = 0; | |
377 } | |
378 | 403 |
379 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | 404 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
380 kReadSize) { | 405 kReadSize) { |
381 // Use power-of-2 buffer sizes. | 406 // Use power-of-2 buffer sizes. |
382 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | 407 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
383 // maximum message size to whatever extent necessary). | 408 // maximum message size to whatever extent necessary). |
384 // TODO(vtl): We may often be able to peek at the header and get the real | 409 // TODO(vtl): We may often be able to peek at the header and get the real |
385 // required extra space (which may be much bigger than |kReadSize|). | 410 // required extra space (which may be much bigger than |kReadSize|). |
386 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); | 411 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); |
387 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) | 412 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) |
(...skipping 30 matching lines...) Expand all Loading... | |
418 if (write_stopped_) { | 443 if (write_stopped_) { |
419 NOTREACHED(); | 444 NOTREACHED(); |
420 return; | 445 return; |
421 } | 446 } |
422 | 447 |
423 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | 448 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, |
424 bytes_written); | 449 bytes_written); |
425 } | 450 } |
426 | 451 |
427 if (did_fail) { | 452 if (did_fail) { |
453 base::AutoLock locker(read_lock_); | |
yzshen1
2015/09/23 22:47:09
nit: use LoadAndCallOnError instead?
| |
428 CallOnError(Delegate::ERROR_WRITE); | 454 CallOnError(Delegate::ERROR_WRITE); |
429 return; // |this| may have been destroyed in |CallOnError()|. | 455 return; // |this| may have been destroyed in |CallOnError()|. |
430 } | 456 } |
431 } | 457 } |
432 | 458 |
433 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 459 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
434 write_lock_.AssertAcquired(); | 460 write_lock_.AssertAcquired(); |
461 DCHECK(HandleForDebuggingNoLock().is_valid()); | |
435 write_buffer_->message_queue_.AddMessage(message.Pass()); | 462 write_buffer_->message_queue_.AddMessage(message.Pass()); |
436 } | 463 } |
437 | 464 |
438 bool RawChannel::OnReadMessageForRawChannel( | 465 bool RawChannel::OnReadMessageForRawChannel( |
439 const MessageInTransit::View& message_view) { | 466 const MessageInTransit::View& message_view) { |
467 if (message_view.type() == MessageInTransit::Type::RAW_CHANNEL_QUIT) { | |
468 message_loop_for_io_->PostTask( | |
469 FROM_HERE, base::Bind(&RawChannel::LockAndCallOnError, | |
470 weak_ptr_factory_.GetWeakPtr(), | |
471 Delegate::ERROR_READ_SHUTDOWN)); | |
472 return true; | |
473 } | |
474 | |
440 // No non-implementation specific |RawChannel| control messages. | 475 // No non-implementation specific |RawChannel| control messages. |
441 LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype() | 476 LOG(ERROR) << "Invalid control message (type " << message_view.type() |
442 << ")"; | 477 << ")"; |
443 return false; | 478 return false; |
444 } | 479 } |
445 | 480 |
446 // static | |
447 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( | 481 RawChannel::Delegate::Error RawChannel::ReadIOResultToError( |
448 IOResult io_result) { | 482 IOResult io_result) { |
449 switch (io_result) { | 483 switch (io_result) { |
450 case IO_FAILED_SHUTDOWN: | 484 case IO_FAILED_SHUTDOWN: |
451 return Delegate::ERROR_READ_SHUTDOWN; | 485 return Delegate::ERROR_READ_SHUTDOWN; |
452 case IO_FAILED_BROKEN: | 486 case IO_FAILED_BROKEN: |
453 return Delegate::ERROR_READ_BROKEN; | 487 return Delegate::ERROR_READ_BROKEN; |
454 case IO_FAILED_UNKNOWN: | 488 case IO_FAILED_UNKNOWN: |
455 return Delegate::ERROR_READ_UNKNOWN; | 489 return Delegate::ERROR_READ_UNKNOWN; |
456 case IO_SUCCEEDED: | 490 case IO_SUCCEEDED: |
457 case IO_PENDING: | 491 case IO_PENDING: |
458 NOTREACHED(); | 492 NOTREACHED(); |
459 break; | 493 break; |
460 } | 494 } |
461 return Delegate::ERROR_READ_UNKNOWN; | 495 return Delegate::ERROR_READ_UNKNOWN; |
462 } | 496 } |
463 | 497 |
464 void RawChannel::CallOnError(Delegate::Error error) { | 498 void RawChannel::CallOnError(Delegate::Error error) { |
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 499 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
466 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? | 500 read_lock_.AssertAcquired(); |
501 error_occurred_ = true; | |
467 if (delegate_) { | 502 if (delegate_) { |
468 delegate_->OnError(error); | 503 delegate_->OnError(error); |
469 return; // |this| may have been destroyed in |OnError()|. | 504 } else { |
505 // We depend on delegate to delete since it could be waiting to call | |
506 // ReleaseHandle. | |
507 base::MessageLoop::current()->PostTask( | |
508 FROM_HERE, | |
509 base::Bind(&RawChannel::Shutdown, weak_ptr_factory_.GetWeakPtr())); | |
470 } | 510 } |
471 } | 511 } |
472 | 512 |
513 void RawChannel::LockAndCallOnError(Delegate::Error error) { | |
514 base::AutoLock locker(read_lock_); | |
515 CallOnError(error); | |
516 } | |
517 | |
473 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, | 518 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result, |
474 size_t platform_handles_written, | 519 size_t platform_handles_written, |
475 size_t bytes_written) { | 520 size_t bytes_written) { |
476 write_lock_.AssertAcquired(); | 521 write_lock_.AssertAcquired(); |
477 | 522 |
478 DCHECK(!write_stopped_); | 523 DCHECK(!write_stopped_); |
479 DCHECK(!write_buffer_->message_queue_.IsEmpty()); | 524 DCHECK(!write_buffer_->message_queue_.IsEmpty()); |
480 | 525 |
481 if (io_result == IO_SUCCEEDED) { | 526 if (io_result == IO_SUCCEEDED) { |
482 write_buffer_->platform_handles_offset_ += platform_handles_written; | 527 write_buffer_->platform_handles_offset_ += platform_handles_written; |
(...skipping 18 matching lines...) Expand all Loading... | |
501 DCHECK_NE(io_result, IO_SUCCEEDED); | 546 DCHECK_NE(io_result, IO_SUCCEEDED); |
502 } | 547 } |
503 | 548 |
504 write_stopped_ = true; | 549 write_stopped_ = true; |
505 write_buffer_->message_queue_.Clear(); | 550 write_buffer_->message_queue_.Clear(); |
506 write_buffer_->platform_handles_offset_ = 0; | 551 write_buffer_->platform_handles_offset_ = 0; |
507 write_buffer_->data_offset_ = 0; | 552 write_buffer_->data_offset_ = 0; |
508 return false; | 553 return false; |
509 } | 554 } |
510 | 555 |
556 void RawChannel::DispatchMessages(bool* did_dispatch_message, | |
557 bool* stop_dispatching) { | |
558 *did_dispatch_message = false; | |
559 *stop_dispatching = false; | |
560 // Tracks the offset of the first undispatched message in |read_buffer_|. | |
561 // Currently, we copy data to ensure that this is zero at the beginning. | |
562 size_t read_buffer_start = 0; | |
563 size_t remaining_bytes = read_buffer_->num_valid_bytes_; | |
564 size_t message_size; | |
565 // Note that we rely on short-circuit evaluation here: | |
566 // - |read_buffer_start| may be an invalid index into | |
567 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | |
568 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | |
569 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | |
570 // next read). | |
571 // TODO(vtl): Validate that |message_size| is sane. | |
572 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | |
573 &read_buffer_->buffer_[read_buffer_start], | |
574 remaining_bytes, &message_size) && | |
575 remaining_bytes >= message_size) { | |
576 MessageInTransit::View message_view( | |
577 message_size, &read_buffer_->buffer_[read_buffer_start]); | |
578 DCHECK_EQ(message_view.total_size(), message_size); | |
579 | |
580 const char* error_message = nullptr; | |
581 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | |
582 &error_message)) { | |
583 DCHECK(error_message); | |
584 LOG(ERROR) << "Received invalid message: " << error_message; | |
585 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
586 *stop_dispatching = true; | |
587 return; // |this| may have been destroyed in |CallOnError()|. | |
588 } | |
589 | |
590 if (message_view.type() != MessageInTransit::Type::MESSAGE) { | |
591 if (!OnReadMessageForRawChannel(message_view)) { | |
592 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
593 *stop_dispatching = true; | |
594 return; // |this| may have been destroyed in |CallOnError()|. | |
595 } | |
596 } else { | |
597 embedder::ScopedPlatformHandleVectorPtr platform_handles; | |
598 if (message_view.transport_data_buffer()) { | |
599 size_t num_platform_handles; | |
600 const void* platform_handle_table; | |
601 TransportData::GetPlatformHandleTable( | |
602 message_view.transport_data_buffer(), &num_platform_handles, | |
603 &platform_handle_table); | |
604 | |
605 if (num_platform_handles > 0) { | |
606 platform_handles = | |
607 GetReadPlatformHandles(num_platform_handles, | |
608 platform_handle_table).Pass(); | |
609 if (!platform_handles) { | |
610 LOG(ERROR) << "Invalid number of platform handles received"; | |
611 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | |
612 *stop_dispatching = true; | |
613 return; // |this| may have been destroyed in |CallOnError()|. | |
614 } | |
615 } | |
616 } | |
617 | |
618 // TODO(vtl): In the case that we aren't expecting any platform handles, | |
619 // for the POSIX implementation, we should confirm that none are stored. | |
620 | |
621 // Dispatch the message. | |
622 // Detect the case when |Shutdown()| is called; subsequent destruction | |
623 // is also permitted then. | |
624 bool shutdown_called = false; | |
yzshen1
2015/09/23 22:47:09
Shutdown() is no longer possible to be called from
| |
625 DCHECK(!set_on_shutdown_); | |
626 set_on_shutdown_ = &shutdown_called; | |
627 // Note: it's valid to get here without a delegate. i.e. after Shutdown | |
628 // is called, if this object still has a valid handle we keep it alive | |
629 // until the other side closes it in response to the RAW_CHANNEL_QUIT | |
630 // message. In the meantime the sender could have sent us a message. | |
631 if (delegate_) | |
632 delegate_->OnReadMessage(message_view, platform_handles.Pass()); | |
633 if (shutdown_called) { | |
634 *stop_dispatching = true; | |
635 return; | |
636 } | |
637 set_on_shutdown_ = nullptr; | |
638 } | |
639 | |
640 *did_dispatch_message = true; | |
641 | |
642 // Update our state. | |
643 read_buffer_start += message_size; | |
644 remaining_bytes -= message_size; | |
645 } | |
646 | |
647 if (read_buffer_start > 0) { | |
648 // Move data back to start. | |
649 read_buffer_->num_valid_bytes_ = remaining_bytes; | |
650 if (read_buffer_->num_valid_bytes_ > 0) { | |
651 memmove(&read_buffer_->buffer_[0], | |
652 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | |
653 } | |
654 read_buffer_start = 0; | |
655 } | |
656 } | |
657 | |
511 } // namespace system | 658 } // namespace system |
512 } // namespace mojo | 659 } // namespace mojo |
OLD | NEW |