OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/raw_channel.h" | 5 #include "mojo/edk/system/raw_channel.h" |
6 | 6 |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 | 10 |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
126 bytes_to_write}; | 126 bytes_to_write}; |
127 buffers->push_back(buffer); | 127 buffers->push_back(buffer); |
128 return; | 128 return; |
129 } | 129 } |
130 | 130 |
131 // TODO(vtl): We could actually send out buffers from multiple messages, with | 131 // TODO(vtl): We could actually send out buffers from multiple messages, with |
132 // the "stopping" condition being reaching a message with platform handles | 132 // the "stopping" condition being reaching a message with platform handles |
133 // attached. | 133 // attached. |
134 | 134 |
135 // Write from both buffers. | 135 // Write from both buffers. |
136 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ + | 136 DCHECK_EQ( |
137 transport_data_buffer_size); | 137 bytes_to_write, |
| 138 message->main_buffer_size() - data_offset_ + transport_data_buffer_size); |
138 Buffer buffer1 = { | 139 Buffer buffer1 = { |
139 static_cast<const char*>(message->main_buffer()) + data_offset_, | 140 static_cast<const char*>(message->main_buffer()) + data_offset_, |
140 message->main_buffer_size() - data_offset_}; | 141 message->main_buffer_size() - data_offset_}; |
141 buffers->push_back(buffer1); | 142 buffers->push_back(buffer1); |
142 Buffer buffer2 = { | 143 Buffer buffer2 = { |
143 static_cast<const char*>(message->transport_data()->buffer()), | 144 static_cast<const char*>(message->transport_data()->buffer()), |
144 transport_data_buffer_size}; | 145 transport_data_buffer_size}; |
145 buffers->push_back(buffer2); | 146 buffers->push_back(buffer2); |
146 } | 147 } |
147 | 148 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
187 message_loop_for_io_ = nullptr; | 188 message_loop_for_io_ = nullptr; |
188 read_buffer_.reset(); | 189 read_buffer_.reset(); |
189 write_buffer_.reset(); | 190 write_buffer_.reset(); |
190 return false; | 191 return false; |
191 } | 192 } |
192 | 193 |
193 IOResult io_result = ScheduleRead(); | 194 IOResult io_result = ScheduleRead(); |
194 if (io_result != IO_PENDING) { | 195 if (io_result != IO_PENDING) { |
195 // This will notify the delegate about the read failure. Although we're on | 196 // This will notify the delegate about the read failure. Although we're on |
196 // the I/O thread, don't call it in the nested context. | 197 // the I/O thread, don't call it in the nested context. |
197 message_loop_for_io_->PostTask( | 198 message_loop_for_io_->PostTask(FROM_HERE, |
198 FROM_HERE, base::Bind(&RawChannel::OnReadCompleted, | 199 base::Bind(&RawChannel::OnReadCompleted, |
199 weak_ptr_factory_.GetWeakPtr(), io_result, 0)); | 200 weak_ptr_factory_.GetWeakPtr(), |
| 201 io_result, |
| 202 0)); |
200 } | 203 } |
201 | 204 |
202 // ScheduleRead() failure is treated as a read failure (by notifying the | 205 // ScheduleRead() failure is treated as a read failure (by notifying the |
203 // delegate), not as an init failure. | 206 // delegate), not as an init failure. |
204 return true; | 207 return true; |
205 } | 208 } |
206 | 209 |
207 void RawChannel::Shutdown() { | 210 void RawChannel::Shutdown() { |
208 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); | 211 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); |
209 | 212 |
(...skipping 26 matching lines...) Expand all Loading... |
236 | 239 |
237 EnqueueMessageNoLock(message.Pass()); | 240 EnqueueMessageNoLock(message.Pass()); |
238 DCHECK_EQ(write_buffer_->data_offset_, 0u); | 241 DCHECK_EQ(write_buffer_->data_offset_, 0u); |
239 | 242 |
240 size_t platform_handles_written = 0; | 243 size_t platform_handles_written = 0; |
241 size_t bytes_written = 0; | 244 size_t bytes_written = 0; |
242 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | 245 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); |
243 if (io_result == IO_PENDING) | 246 if (io_result == IO_PENDING) |
244 return true; | 247 return true; |
245 | 248 |
246 bool result = OnWriteCompletedNoLock(io_result, platform_handles_written, | 249 bool result = OnWriteCompletedNoLock( |
247 bytes_written); | 250 io_result, platform_handles_written, bytes_written); |
248 if (!result) { | 251 if (!result) { |
249 // Even if we're on the I/O thread, don't call |OnError()| in the nested | 252 // Even if we're on the I/O thread, don't call |OnError()| in the nested |
250 // context. | 253 // context. |
251 message_loop_for_io_->PostTask( | 254 message_loop_for_io_->PostTask(FROM_HERE, |
252 FROM_HERE, | 255 base::Bind(&RawChannel::CallOnError, |
253 base::Bind(&RawChannel::CallOnError, weak_ptr_factory_.GetWeakPtr(), | 256 weak_ptr_factory_.GetWeakPtr(), |
254 Delegate::ERROR_WRITE)); | 257 Delegate::ERROR_WRITE)); |
255 } | 258 } |
256 | 259 |
257 return result; | 260 return result; |
258 } | 261 } |
259 | 262 |
260 // Reminder: This must be thread-safe. | 263 // Reminder: This must be thread-safe. |
261 bool RawChannel::IsWriteBufferEmpty() { | 264 bool RawChannel::IsWriteBufferEmpty() { |
262 base::AutoLock locker(write_lock_); | 265 base::AutoLock locker(write_lock_); |
263 return write_buffer_->message_queue_.empty(); | 266 return write_buffer_->message_queue_.empty(); |
264 } | 267 } |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
302 size_t message_size; | 305 size_t message_size; |
303 // Note that we rely on short-circuit evaluation here: | 306 // Note that we rely on short-circuit evaluation here: |
304 // - |read_buffer_start| may be an invalid index into | 307 // - |read_buffer_start| may be an invalid index into |
305 // |read_buffer_->buffer_| if |remaining_bytes| is zero. | 308 // |read_buffer_->buffer_| if |remaining_bytes| is zero. |
306 // - |message_size| is only valid if |GetNextMessageSize()| returns true. | 309 // - |message_size| is only valid if |GetNextMessageSize()| returns true. |
307 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the | 310 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the |
308 // next read). | 311 // next read). |
309 // TODO(vtl): Validate that |message_size| is sane. | 312 // TODO(vtl): Validate that |message_size| is sane. |
310 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( | 313 while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize( |
311 &read_buffer_->buffer_[read_buffer_start], | 314 &read_buffer_->buffer_[read_buffer_start], |
312 remaining_bytes, &message_size) && | 315 remaining_bytes, |
| 316 &message_size) && |
313 remaining_bytes >= message_size) { | 317 remaining_bytes >= message_size) { |
314 MessageInTransit::View message_view( | 318 MessageInTransit::View message_view( |
315 message_size, &read_buffer_->buffer_[read_buffer_start]); | 319 message_size, &read_buffer_->buffer_[read_buffer_start]); |
316 DCHECK_EQ(message_view.total_size(), message_size); | 320 DCHECK_EQ(message_view.total_size(), message_size); |
317 | 321 |
318 const char* error_message = nullptr; | 322 const char* error_message = nullptr; |
319 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), | 323 if (!message_view.IsValid(GetSerializedPlatformHandleSize(), |
320 &error_message)) { | 324 &error_message)) { |
321 DCHECK(error_message); | 325 DCHECK(error_message); |
322 LOG(ERROR) << "Received invalid message: " << error_message; | 326 LOG(ERROR) << "Received invalid message: " << error_message; |
323 read_stopped_ = true; | 327 read_stopped_ = true; |
324 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 328 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
325 return; | 329 return; |
326 } | 330 } |
327 | 331 |
328 if (message_view.type() == MessageInTransit::kTypeRawChannel) { | 332 if (message_view.type() == MessageInTransit::kTypeRawChannel) { |
329 if (!OnReadMessageForRawChannel(message_view)) { | 333 if (!OnReadMessageForRawChannel(message_view)) { |
330 read_stopped_ = true; | 334 read_stopped_ = true; |
331 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 335 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
332 return; | 336 return; |
333 } | 337 } |
334 } else { | 338 } else { |
335 embedder::ScopedPlatformHandleVectorPtr platform_handles; | 339 embedder::ScopedPlatformHandleVectorPtr platform_handles; |
336 if (message_view.transport_data_buffer()) { | 340 if (message_view.transport_data_buffer()) { |
337 size_t num_platform_handles; | 341 size_t num_platform_handles; |
338 const void* platform_handle_table; | 342 const void* platform_handle_table; |
339 TransportData::GetPlatformHandleTable( | 343 TransportData::GetPlatformHandleTable( |
340 message_view.transport_data_buffer(), &num_platform_handles, | 344 message_view.transport_data_buffer(), |
| 345 &num_platform_handles, |
341 &platform_handle_table); | 346 &platform_handle_table); |
342 | 347 |
343 if (num_platform_handles > 0) { | 348 if (num_platform_handles > 0) { |
344 platform_handles = | 349 platform_handles = |
345 GetReadPlatformHandles(num_platform_handles, | 350 GetReadPlatformHandles(num_platform_handles, |
346 platform_handle_table).Pass(); | 351 platform_handle_table).Pass(); |
347 if (!platform_handles) { | 352 if (!platform_handles) { |
348 LOG(ERROR) << "Invalid number of platform handles received"; | 353 LOG(ERROR) << "Invalid number of platform handles received"; |
349 read_stopped_ = true; | 354 read_stopped_ = true; |
350 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); | 355 CallOnError(Delegate::ERROR_READ_BAD_MESSAGE); |
(...skipping 20 matching lines...) Expand all Loading... |
371 // Update our state. | 376 // Update our state. |
372 read_buffer_start += message_size; | 377 read_buffer_start += message_size; |
373 remaining_bytes -= message_size; | 378 remaining_bytes -= message_size; |
374 } | 379 } |
375 | 380 |
376 if (read_buffer_start > 0) { | 381 if (read_buffer_start > 0) { |
377 // Move data back to start. | 382 // Move data back to start. |
378 read_buffer_->num_valid_bytes_ = remaining_bytes; | 383 read_buffer_->num_valid_bytes_ = remaining_bytes; |
379 if (read_buffer_->num_valid_bytes_ > 0) { | 384 if (read_buffer_->num_valid_bytes_ > 0) { |
380 memmove(&read_buffer_->buffer_[0], | 385 memmove(&read_buffer_->buffer_[0], |
381 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); | 386 &read_buffer_->buffer_[read_buffer_start], |
| 387 remaining_bytes); |
382 } | 388 } |
383 read_buffer_start = 0; | 389 read_buffer_start = 0; |
384 } | 390 } |
385 | 391 |
386 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < | 392 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < |
387 kReadSize) { | 393 kReadSize) { |
388 // Use power-of-2 buffer sizes. | 394 // Use power-of-2 buffer sizes. |
389 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the | 395 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the |
390 // maximum message size to whatever extent necessary). | 396 // maximum message size to whatever extent necessary). |
391 // TODO(vtl): We may often be able to peek at the header and get the real | 397 // TODO(vtl): We may often be able to peek at the header and get the real |
(...skipping 28 matching lines...) Expand all Loading... |
420 bool did_fail = false; | 426 bool did_fail = false; |
421 { | 427 { |
422 base::AutoLock locker(write_lock_); | 428 base::AutoLock locker(write_lock_); |
423 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); | 429 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); |
424 | 430 |
425 if (write_stopped_) { | 431 if (write_stopped_) { |
426 NOTREACHED(); | 432 NOTREACHED(); |
427 return; | 433 return; |
428 } | 434 } |
429 | 435 |
430 did_fail = !OnWriteCompletedNoLock(io_result, platform_handles_written, | 436 did_fail = !OnWriteCompletedNoLock( |
431 bytes_written); | 437 io_result, platform_handles_written, bytes_written); |
432 } | 438 } |
433 | 439 |
434 if (did_fail) | 440 if (did_fail) |
435 CallOnError(Delegate::ERROR_WRITE); | 441 CallOnError(Delegate::ERROR_WRITE); |
436 } | 442 } |
437 | 443 |
438 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { | 444 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) { |
439 write_lock_.AssertAcquired(); | 445 write_lock_.AssertAcquired(); |
440 write_buffer_->message_queue_.push_back(message.release()); | 446 write_buffer_->message_queue_.push_back(message.release()); |
441 } | 447 } |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
507 | 513 |
508 write_stopped_ = true; | 514 write_stopped_ = true; |
509 STLDeleteElements(&write_buffer_->message_queue_); | 515 STLDeleteElements(&write_buffer_->message_queue_); |
510 write_buffer_->platform_handles_offset_ = 0; | 516 write_buffer_->platform_handles_offset_ = 0; |
511 write_buffer_->data_offset_ = 0; | 517 write_buffer_->data_offset_ = 0; |
512 return false; | 518 return false; |
513 } | 519 } |
514 | 520 |
515 } // namespace system | 521 } // namespace system |
516 } // namespace mojo | 522 } // namespace mojo |
OLD | NEW |