OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "network/public/cpp/web_socket_read_queue.h" | 5 #include "network/public/cpp/web_socket_read_queue.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/memory/scoped_ptr.h" |
9 | 10 |
10 namespace mojo { | 11 namespace mojo { |
11 | 12 |
12 struct WebSocketReadQueue::Operation { | 13 struct WebSocketReadQueue::Operation { |
| 14 Operation(uint32_t num_bytes, |
| 15 const base::Callback<void(const char*)>& callback) |
| 16 : num_bytes_(num_bytes), callback_(callback), current_num_bytes_(0) {} |
| 17 |
13 uint32_t num_bytes_; | 18 uint32_t num_bytes_; |
14 base::Callback<void(const char*)> callback_; | 19 base::Callback<void(const char*)> callback_; |
| 20 |
| 21 // If the initial read doesn't return enough data, this array is used to |
| 22 // accumulate data from multiple reads. |
| 23 scoped_ptr<char[]> data_buffer_; |
| 24 // The number of bytes accumulated so far. |
| 25 uint32_t current_num_bytes_; |
15 }; | 26 }; |
16 | 27 |
17 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) | 28 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) |
18 : handle_(handle), is_busy_(false), weak_factory_(this) { | 29 : handle_(handle), is_busy_(false), weak_factory_(this) { |
19 } | 30 } |
20 | 31 |
21 WebSocketReadQueue::~WebSocketReadQueue() { | 32 WebSocketReadQueue::~WebSocketReadQueue() { |
22 } | 33 } |
23 | 34 |
24 void WebSocketReadQueue::Read(uint32_t num_bytes, | 35 void WebSocketReadQueue::Read( |
25 base::Callback<void(const char*)> callback) { | 36 uint32_t num_bytes, |
26 Operation* op = new Operation; | 37 const base::Callback<void(const char*)>& callback) { |
27 op->num_bytes_ = num_bytes; | 38 Operation* op = new Operation(num_bytes, callback); |
28 op->callback_ = callback; | |
29 queue_.push_back(op); | 39 queue_.push_back(op); |
30 | 40 |
31 if (is_busy_) | 41 if (is_busy_) |
32 return; | 42 return; |
33 | 43 |
34 is_busy_ = true; | 44 is_busy_ = true; |
35 TryToRead(); | 45 TryToRead(); |
36 } | 46 } |
37 | 47 |
38 void WebSocketReadQueue::TryToRead() { | 48 void WebSocketReadQueue::TryToRead() { |
39 DCHECK(is_busy_); | 49 DCHECK(is_busy_); |
40 DCHECK(!queue_.empty()); | 50 DCHECK(!queue_.empty()); |
41 do { | 51 do { |
42 Operation* op = queue_[0]; | 52 Operation* op = queue_[0]; |
43 const void* buffer = NULL; | 53 const void* buffer = nullptr; |
44 uint32_t bytes_read = op->num_bytes_; | 54 uint32_t buffer_size = 0; |
45 MojoResult result = BeginReadDataRaw( | 55 MojoResult result = BeginReadDataRaw(handle_, &buffer, &buffer_size, |
46 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); | 56 MOJO_READ_DATA_FLAG_NONE); |
47 if (result == MOJO_RESULT_SHOULD_WAIT) { | 57 if (result == MOJO_RESULT_SHOULD_WAIT) { |
48 Wait(); | 58 Wait(); |
49 return; | 59 return; |
50 } | 60 } |
51 | 61 |
52 // Ensure |op| is deleted, whether or not |this| goes away. | |
53 scoped_ptr<Operation> op_deleter(op); | |
54 queue_.weak_erase(queue_.begin()); | |
55 | |
56 // http://crbug.com/490193 This should run callback as well. May need to | 62 // http://crbug.com/490193 This should run callback as well. May need to |
57 // change the callback signature. | 63 // change the callback signature. |
58 if (result != MOJO_RESULT_OK) | 64 if (result != MOJO_RESULT_OK) |
59 return; | 65 return; |
60 | 66 |
61 uint32_t num_bytes = op_deleter->num_bytes_; | 67 uint32_t bytes_read = buffer_size < op->num_bytes_ - op->current_num_bytes_ |
62 DCHECK_LE(num_bytes, bytes_read); | 68 ? buffer_size |
| 69 : op->num_bytes_ - op->current_num_bytes_; |
| 70 |
| 71 // If this is not the initial read, or this is the initial read but doesn't |
| 72 // return enough data, copy the data into |op->data_buffer_|. |
| 73 if (op->data_buffer_ || |
| 74 bytes_read < op->num_bytes_ - op->current_num_bytes_) { |
| 75 if (!op->data_buffer_) { |
| 76 DCHECK_EQ(0u, op->current_num_bytes_); |
| 77 op->data_buffer_.reset(new char[op->num_bytes_]); |
| 78 } |
| 79 |
| 80 memcpy(op->data_buffer_.get() + op->current_num_bytes_, buffer, |
| 81 bytes_read); |
| 82 } |
| 83 op->current_num_bytes_ += bytes_read; |
63 DataPipeConsumerHandle handle = handle_; | 84 DataPipeConsumerHandle handle = handle_; |
64 | |
65 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr()); | 85 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr()); |
66 | 86 |
67 // This call may delete |this|. In that case, |self| will be invalidated. | 87 if (op->current_num_bytes_ >= op->num_bytes_) { |
68 // It may re-enter Read() too. Because |is_busy_| is true during the whole | 88 DCHECK_EQ(op->current_num_bytes_, op->num_bytes_); |
69 // process, TryToRead() won't be re-entered. | 89 const char* returned_buffer = op->data_buffer_ |
70 op->callback_.Run(static_cast<const char*>(buffer)); | 90 ? op->data_buffer_.get() |
| 91 : static_cast<const char*>(buffer); |
71 | 92 |
72 EndReadDataRaw(handle, num_bytes); | 93 // Ensure |op| is deleted, whether or not |this| goes away. |
| 94 scoped_ptr<Operation> op_deleter(op); |
| 95 queue_.weak_erase(queue_.begin()); |
| 96 |
| 97 // This call may delete |this|. In that case, |self| will be invalidated. |
| 98 // It may re-enter Read() too. Because |is_busy_| is true during the whole |
| 99 // process, TryToRead() won't be re-entered. |
| 100 op->callback_.Run(returned_buffer); |
| 101 } |
| 102 |
| 103 EndReadDataRaw(handle, bytes_read); |
73 | 104 |
74 if (!self) | 105 if (!self) |
75 return; | 106 return; |
76 } while (!queue_.empty()); | 107 } while (!queue_.empty()); |
77 is_busy_ = false; | 108 is_busy_ = false; |
78 } | 109 } |
79 | 110 |
80 void WebSocketReadQueue::Wait() { | 111 void WebSocketReadQueue::Wait() { |
81 DCHECK(is_busy_); | 112 DCHECK(is_busy_); |
82 handle_watcher_.Start( | 113 handle_watcher_.Start( |
83 handle_, | 114 handle_, |
84 MOJO_HANDLE_SIGNAL_READABLE, | 115 MOJO_HANDLE_SIGNAL_READABLE, |
85 MOJO_DEADLINE_INDEFINITE, | 116 MOJO_DEADLINE_INDEFINITE, |
86 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); | 117 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); |
87 } | 118 } |
88 | 119 |
89 void WebSocketReadQueue::OnHandleReady(MojoResult result) { | 120 void WebSocketReadQueue::OnHandleReady(MojoResult result) { |
90 DCHECK(is_busy_); | 121 DCHECK(is_busy_); |
91 TryToRead(); | 122 TryToRead(); |
92 } | 123 } |
93 | 124 |
94 } // namespace mojo | 125 } // namespace mojo |
OLD | NEW |