OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "network/public/cpp/web_socket_read_queue.h" | |
6 | |
7 #include <stdint.h> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/logging.h" | |
11 #include "base/memory/scoped_ptr.h" | |
12 | |
13 namespace mojo { | |
14 | |
15 struct WebSocketReadQueue::Operation { | |
16 Operation(uint32_t num_bytes, | |
17 const base::Callback<void(const char*)>& callback) | |
18 : num_bytes_(num_bytes), callback_(callback), current_num_bytes_(0) {} | |
19 | |
20 uint32_t num_bytes_; | |
21 base::Callback<void(const char*)> callback_; | |
22 | |
23 // If the initial read doesn't return enough data, this array is used to | |
24 // accumulate data from multiple reads. | |
25 scoped_ptr<char[]> data_buffer_; | |
26 // The number of bytes accumulated so far. | |
27 uint32_t current_num_bytes_; | |
28 }; | |
29 | |
30 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) | |
31 : handle_(handle), is_busy_(false), weak_factory_(this) { | |
32 } | |
33 | |
34 WebSocketReadQueue::~WebSocketReadQueue() { | |
35 } | |
36 | |
37 void WebSocketReadQueue::Read( | |
38 uint32_t num_bytes, | |
39 const base::Callback<void(const char*)>& callback) { | |
40 Operation* op = new Operation(num_bytes, callback); | |
41 queue_.push_back(op); | |
42 | |
43 if (is_busy_) | |
44 return; | |
45 | |
46 is_busy_ = true; | |
47 TryToRead(); | |
48 } | |
49 | |
50 void WebSocketReadQueue::TryToRead() { | |
51 DCHECK(is_busy_); | |
52 DCHECK(!queue_.empty()); | |
53 do { | |
54 Operation* op = queue_[0]; | |
55 const void* buffer = nullptr; | |
56 uint32_t buffer_size = 0; | |
57 MojoResult result = BeginReadDataRaw(handle_, &buffer, &buffer_size, | |
58 MOJO_READ_DATA_FLAG_NONE); | |
59 if (result == MOJO_RESULT_SHOULD_WAIT) { | |
60 Wait(); | |
61 return; | |
62 } | |
63 | |
64 // http://crbug.com/490193 This should run callback as well. May need to | |
65 // change the callback signature. | |
66 if (result != MOJO_RESULT_OK) | |
67 return; | |
68 | |
69 uint32_t bytes_read = buffer_size < op->num_bytes_ - op->current_num_bytes_ | |
70 ? buffer_size | |
71 : op->num_bytes_ - op->current_num_bytes_; | |
72 | |
73 // If this is not the initial read, or this is the initial read but doesn't | |
74 // return enough data, copy the data into |op->data_buffer_|. | |
75 if (op->data_buffer_ || | |
76 bytes_read < op->num_bytes_ - op->current_num_bytes_) { | |
77 if (!op->data_buffer_) { | |
78 DCHECK_EQ(0u, op->current_num_bytes_); | |
79 op->data_buffer_.reset(new char[op->num_bytes_]); | |
80 } | |
81 | |
82 memcpy(op->data_buffer_.get() + op->current_num_bytes_, buffer, | |
83 bytes_read); | |
84 } | |
85 op->current_num_bytes_ += bytes_read; | |
86 DataPipeConsumerHandle handle = handle_; | |
87 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr()); | |
88 | |
89 if (op->current_num_bytes_ >= op->num_bytes_) { | |
90 DCHECK_EQ(op->current_num_bytes_, op->num_bytes_); | |
91 const char* returned_buffer = op->data_buffer_ | |
92 ? op->data_buffer_.get() | |
93 : static_cast<const char*>(buffer); | |
94 | |
95 // Ensure |op| is deleted, whether or not |this| goes away. | |
96 scoped_ptr<Operation> op_deleter(op); | |
97 queue_.weak_erase(queue_.begin()); | |
98 | |
99 // This call may delete |this|. In that case, |self| will be invalidated. | |
100 // It may re-enter Read() too. Because |is_busy_| is true during the whole | |
101 // process, TryToRead() won't be re-entered. | |
102 op->callback_.Run(returned_buffer); | |
103 } | |
104 | |
105 EndReadDataRaw(handle, bytes_read); | |
106 | |
107 if (!self) | |
108 return; | |
109 } while (!queue_.empty()); | |
110 is_busy_ = false; | |
111 } | |
112 | |
113 void WebSocketReadQueue::Wait() { | |
114 DCHECK(is_busy_); | |
115 handle_watcher_.Start( | |
116 handle_, | |
117 MOJO_HANDLE_SIGNAL_READABLE, | |
118 MOJO_DEADLINE_INDEFINITE, | |
119 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); | |
120 } | |
121 | |
122 void WebSocketReadQueue::OnHandleReady(MojoResult result) { | |
123 DCHECK(is_busy_); | |
124 TryToRead(); | |
125 } | |
126 | |
127 } // namespace mojo | |
OLD | NEW |