OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
126 bytes_to_write}; | 126 bytes_to_write}; |
127 buffers->push_back(buffer); | 127 buffers->push_back(buffer); |
128 return; | 128 return; |
129 } | 129 } |
130 | 130 |
131 // TODO(vtl): We could actually send out buffers from multiple messages, with | 131 // TODO(vtl): We could actually send out buffers from multiple messages, with |
132 // the "stopping" condition being reaching a message with platform handles | 132 // the "stopping" condition being reaching a message with platform handles |
133 // attached. | 133 // attached. |
134 | 134 |
135 // Write from both buffers. | 135 // Write from both buffers. |
136 DCHECK_EQ( | 136 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + |
137 bytes_to_write, | 137 transport_data_buffer_size); |
138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); | |
139 Buffer buffer1 = { | 138 Buffer buffer1 = { |
140 static_cast<const char*>(message->main_buffer()) + data_offset_, | 139 static_cast<const char*>(message->main_buffer()) + data_offset_, |
141 message->main_buffer_size() - data_offset_}; | 140 message->main_buffer_size() - data_offset_}; |
142 buffers->push_back(buffer1); | 141 buffers->push_back(buffer1); |
143 Buffer buffer2 = { | 142 Buffer buffer2 = { |
144 static_cast<const char*>(message->transport_data()->buffer()), | 143 static_cast<const char*>(message->transport_data()->buffer()), |
145 transport_data_buffer_size}; | 144 transport_data_buffer_size}; |
146 buffers->push_back(buffer2); | 145 buffers->push_back(buffer2); |
147 } | 146 } |
148 | 147 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
188 message_loop_for_io_ = nullptr; | 187 message_loop_for_io_ = nullptr; |
189 read_buffer_.reset(); | 188 read_buffer_.reset(); |
190 write_buffer_.reset(); | 189 write_buffer_.reset(); |
191 return false; | 190 return false; |
192 } | 191 } |
193 | 192 |
194 IOResult io_result = ScheduleRead(); | 193 IOResult io_result = ScheduleRead(); |
195 if (io_result != IO_PENDING) { | 194 if (io_result != IO_PENDING) { |
196 // This will notify the delegate about the read failure. Although we're on | 195 // This will notify the delegate about the read failure. Although we're on |
197 // the I/O thread, don't call it in the nested context. | 196 // the I/O thread, don't call it in the nested context. |
198 message_loop_for_io_->PostTask(FROM_HERE, | 197 message_loop_for_io_->PostTask( |
199 base::Bind(&RawChannel::OnReadCompleted, | 198 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, |
200 weak_ptr_factory_.GetWeakPtr(), | 199 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); |
201 io_result, | |
202 0)); | |
203 } | 200 } |
204 | 201 |
205 // ScheduleRead() failure is treated as a read failure (by notifying the | 202 // ScheduleRead() failure is treated as a read failure (by notifying the |
206 // delegate), not as an init failure. | 203 // delegate), not as an init failure. |
207 return true; | 204 return true; |
208 } | 205 } |
209 | 206 |
210 void RawChannel::Shutdown() { | 207 void RawChannel::Shutdown() { |
211 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 208 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
212 | 209 |
(...skipping 26 matching lines...) Expand all Loading... |
239 | 236 |
240 EnqueueMessageNoLock(message.Pass()); | 237 EnqueueMessageNoLock(message.Pass()); |
241 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 238 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
242 | 239 |
243 size_t platform_handles_written = 0; | 240 size_t platform_handles_written = 0; |
244 size_t bytes_written = 0; | 241 size_t bytes_written = 0; |
245 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 242 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
246 if (io_result == IO_PENDING) | 243 if (io_result == IO_PENDING) |
247 return true; | 244 return true; |
248 | 245 |
249 bool result = OnWriteCompletedNoLock( | 246 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, |
250 io_result, platform_handles_written, bytes_written); | 247 bytes_written); |
251 if (!result) { | 248 if (!result) { |
252 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 249 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
253 // context. | 250 // context. |
254 message_loop_for_io_->PostTask(FROM_HERE, | 251 message_loop_for_io_->PostTask( |
255 base::Bind(&RawChannel::CallOnError, | 252 FROM_HERE, |
256 weak_ptr_factory_.GetWeakPtr(), | 253 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), |
257 Delegate::ERROR_WRITE)); | 254 Delegate::ERROR_WRITE)); |
258 } | 255 } |
259 | 256 |
260 return result; | 257 return result; |
261 } | 258 } |
262 | 259 |
263 // Reminder: This must be thread-safe. | 260 // Reminder: This must be thread-safe. |
264 bool RawChannel::IsWriteBufferEmpty() { | 261 bool RawChannel::IsWriteBufferEmpty() { |
265 base::AutoLock locker(write_lock_); | 262 base::AutoLock locker(write_lock_); |
266 return write_buffer_->message_queue_.empty(); | 263 return write_buffer_->message_queue_.empty(); |
267 } | 264 } |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
305 size_t message_size; | 302 size_t message_size; |
306 // Note that we rely on short-circuit evaluation here: | 303 // Note that we rely on short-circuit evaluation here: |
307 // - |read_buffer_start| may be an invalid index into | 304 // - |read_buffer_start| may be an invalid index into |
308 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | 305 // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
309 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | 306 // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
310 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | 307 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
311 // next read). | 308 // next read). |
312 // TODO(vtl): Validate that |message_size| is sane. | 309 // TODO(vtl): Validate that |message_size| is sane. |
313 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | 310 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
314 &read_buffer_->buffer_[read_buffer_start], | 311 &read_buffer_->buffer_[read_buffer_start], |
315 remaining_bytes, | 312 remaining_bytes, &message_size) && |
316 &message_size) && | |
317 remaining_bytes >= message_size) { | 313 remaining_bytes >= message_size) { |
318 MessageInTransit::View message_view( | 314 MessageInTransit::View message_view( |
319 message_size, &read_buffer_->buffer_[read_buffer_start]); | 315 message_size, &read_buffer_->buffer_[read_buffer_start]); |
320 DCHECK_EQ(message_view.total_size(), message_size); | 316 DCHECK_EQ(message_view.total_size(), message_size); |
321 | 317 |
322 const char* error_message = nullptr; | 318 const char* error_message = nullptr; |
323 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | 319 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
324 &error_message)) { | 320 &error_message)) { |
325 DCHECK(error_message); | 321 DCHECK(error_message); |
326 LOG(ERROR) << "Received invalid message: " << error_message; | 322 LOG(ERROR) << "Received invalid message: " << error_message; |
327 read_stopped_ = true; | 323 read_stopped_ = true; |
328 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 324 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
329 return; | 325 return; |
330 } | 326 } |
331 | 327 |
332 if (message_view.type() == MessageInTransit::kTypeRawChannel) { | 328 if (message_view.type() == MessageInTransit::kTypeRawChannel) { |
333 if (!OnReadMessageForRawChannel(message_view)) { | 329 if (!OnReadMessageForRawChannel(message_view)) { |
334 read_stopped_ = true; | 330 read_stopped_ = true; |
335 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 331 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
336 return; | 332 return; |
337 } | 333 } |
338 } else { | 334 } else { |
339 embedder::ScopedPlatformHandleVectorPtr platform_handles; | 335 embedder::ScopedPlatformHandleVectorPtr platform_handles; |
340 if (message_view.transport_data_buffer()) { | 336 if (message_view.transport_data_buffer()) { |
341 size_t num_platform_handles; | 337 size_t num_platform_handles; |
342 const void* platform_handle_table; | 338 const void* platform_handle_table; |
343 TransportData::GetPlatformHandleTable( | 339 TransportData::GetPlatformHandleTable( |
344 message_view.transport_data_buffer(), | 340 message_view.transport_data_buffer(), &num_platform_handles, |
345 &num_platform_handles, | |
346 &platform_handle_table); | 341 &platform_handle_table); |
347 | 342 |
348 if (num_platform_handles > 0) { | 343 if (num_platform_handles > 0) { |
349 platform_handles = | 344 platform_handles = |
350 GetReadPlatformHandles(num_platform_handles, | 345 GetReadPlatformHandles(num_platform_handles, |
351 platform_handle_table).Pass(); | 346 platform_handle_table).Pass(); |
352 if (!platform_handles) { | 347 if (!platform_handles) { |
353 LOG(ERROR) << "Invalid number of platform handles received"; | 348 LOG(ERROR) << "Invalid number of platform handles received"; |
354 read_stopped_ = true; | 349 read_stopped_ = true; |
355 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 350 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
(...skipping 20 matching lines...) Expand all Loading... |
376 // Update our state. | 371 // Update our state. |
377 read_buffer_start += message_size; | 372 read_buffer_start += message_size; |
378 remaining_bytes -= message_size; | 373 remaining_bytes -= message_size; |
379 } | 374 } |
380 | 375 |
381 if (read_buffer_start > 0) { | 376 if (read_buffer_start > 0) { |
382 // Move data back to start. | 377 // Move data back to start. |
383 read_buffer_->num_valid_bytes_ = remaining_bytes; | 378 read_buffer_->num_valid_bytes_ = remaining_bytes; |
384 if (read_buffer_->num_valid_bytes_ > 0) { | 379 if (read_buffer_->num_valid_bytes_ > 0) { |
385 memmove(&read_buffer_->buffer_[0], | 380 memmove(&read_buffer_->buffer_[0], |
386 &read_buffer_->buffer_[read_buffer_start], | 381 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); |
387 remaining_bytes); | |
388 } | 382 } |
389 read_buffer_start = 0; | 383 read_buffer_start = 0; |
390 } | 384 } |
391 | 385 |
392 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | 386 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
393 kReadSize) { | 387 kReadSize) { |
394 // Use power-of-2 buffer sizes. | 388 // Use power-of-2 buffer sizes. |
395 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | 389 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
396 // maximum message size to whatever extent necessary). | 390 // maximum message size to whatever extent necessary). |
397 // TODO(vtl): We may often be able to peek at the header and get the real | 391 // TODO(vtl): We may often be able to peek at the header and get the real |
(...skipping 28 matching lines...) Expand all Loading... |
426 bool did_fail = false; | 420 bool did_fail = false; |
427 { | 421 { |
428 base::AutoLock locker(write_lock_); | 422 base::AutoLock locker(write_lock_); |
429 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); | 423 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
430 | 424 |
431 if (write_stopped_) { | 425 if (write_stopped_) { |
432 NOTREACHED(); | 426 NOTREACHED(); |
433 return; | 427 return; |
434 } | 428 } |
435 | 429 |
436 did_fail = !OnWriteCompletedNoLock( | 430 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, |
437 io_result, platform_handles_written, bytes_written); | 431 bytes_written); |
438 } | 432 } |
439 | 433 |
440 if (did_fail) | 434 if (did_fail) |
441 CallOnError(Delegate::ERROR_WRITE); | 435 CallOnError(Delegate::ERROR_WRITE); |
442 } | 436 } |
443 | 437 |
444 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 438 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
445 write_lock_.AssertAcquired(); | 439 write_lock_.AssertAcquired(); |
446 write_buffer_->message_queue_.push_back(message.release()); | 440 write_buffer_->message_queue_.push_back(message.release()); |
447 } | 441 } |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
513 | 507 |
514 write_stopped_ = true; | 508 write_stopped_ = true; |
515 STLDeleteElements(&write_buffer_->message_queue_); | 509 STLDeleteElements(&write_buffer_->message_queue_); |
516 write_buffer_->platform_handles_offset_ = 0; | 510 write_buffer_->platform_handles_offset_ = 0; |
517 write_buffer_->data_offset_ = 0; | 511 write_buffer_->data_offset_ = 0; |
518 return false; | 512 return false; |
519 } | 513 } |
520 | 514 |
521 } // namespace system | 515 } // namespace system |
522 } // namespace mojo | 516 } // namespace mojo |
OLD | NEW |