OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include <assert.h> | |
6 #include <stdint.h> | |
7 #include <utility> | |
8 | |
9 #include "base/logging.h" | |
10 #include "network/public/cpp/udp_socket_wrapper.h" | |
11 | |
12 namespace mojo { | |
13 namespace { | |
14 | |
15 const uint32_t kDefaultReceiveQueueSlots = 32; | |
16 | |
17 } // namespace | |
18 | |
19 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler( | |
20 UDPSocketWrapper* delegate) | |
21 : delegate_(delegate) { | |
22 } | |
23 | |
24 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {} | |
25 | |
26 void UDPSocketWrapper::NegotiateCallbackHandler::Run( | |
27 uint32_t actual_size) const { | |
28 delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size); | |
29 } | |
30 | |
31 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler( | |
32 UDPSocketWrapper* delegate, | |
33 const ErrorCallback& forward_callback) | |
34 : delegate_(delegate), | |
35 forward_callback_(forward_callback) { | |
36 } | |
37 | |
38 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {} | |
39 | |
40 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const { | |
41 delegate_->OnSendToCompleted(std::move(result), forward_callback_); | |
42 } | |
43 | |
44 UDPSocketWrapper::ReceiverBindingCallback::ReceiverBindingCallback( | |
45 UDPSocketWrapper* delegate, | |
46 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& wrapper_callback) | |
47 : delegate_(delegate), wrapper_callback_(wrapper_callback) { | |
48 } | |
49 | |
50 UDPSocketWrapper::ReceiverBindingCallback::~ReceiverBindingCallback() { | |
51 } | |
52 | |
53 void UDPSocketWrapper::ReceiverBindingCallback::Run( | |
54 NetworkErrorPtr result, | |
55 NetAddressPtr addr, | |
56 InterfaceRequest<UDPSocketReceiver> request) const { | |
57 delegate_->StartReceivingData(std::move(request)); | |
58 wrapper_callback_.Run(std::move(result), std::move(addr)); | |
59 } | |
60 | |
61 UDPSocketWrapper::ReceivedData::ReceivedData() {} | |
62 UDPSocketWrapper::ReceivedData::~ReceivedData() {} | |
63 | |
64 UDPSocketWrapper::SendRequest::SendRequest() {} | |
65 UDPSocketWrapper::SendRequest::~SendRequest() {} | |
66 | |
67 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket) | |
68 : binding_(this), | |
69 socket_(std::move(socket)), | |
70 max_receive_queue_size_(kDefaultReceiveQueueSlots), | |
71 max_pending_sends_(1), | |
72 current_pending_sends_(0) { | |
73 Initialize(0); | |
74 } | |
75 | |
76 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket, | |
77 uint32_t receive_queue_slots, | |
78 uint32_t requested_max_pending_sends) | |
79 : binding_(this), | |
80 socket_(std::move(socket)), | |
81 max_receive_queue_size_(receive_queue_slots), | |
82 max_pending_sends_(1), | |
83 current_pending_sends_(0) { | |
84 Initialize(requested_max_pending_sends); | |
85 } | |
86 | |
87 UDPSocketWrapper::~UDPSocketWrapper() { | |
88 while (!receive_queue_.empty()) { | |
89 delete receive_queue_.front(); | |
90 receive_queue_.pop(); | |
91 } | |
92 while (!send_requests_.empty()) { | |
93 delete send_requests_.front(); | |
94 send_requests_.pop(); | |
95 } | |
96 } | |
97 | |
98 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) { | |
99 socket_->AllowAddressReuse(callback); | |
100 } | |
101 | |
102 void UDPSocketWrapper::Bind( | |
103 NetAddressPtr addr, | |
104 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { | |
105 socket_->Bind( | |
106 std::move(addr), | |
107 BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable*>( | |
108 new ReceiverBindingCallback(this, callback)))); | |
109 } | |
110 | |
111 void UDPSocketWrapper::Connect( | |
112 NetAddressPtr remote_addr, | |
113 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { | |
114 socket_->Connect( | |
115 std::move(remote_addr), | |
116 BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable*>( | |
117 new ReceiverBindingCallback(this, callback)))); | |
118 } | |
119 | |
120 void UDPSocketWrapper::SetSendBufferSize(uint32_t size, | |
121 const ErrorCallback& callback) { | |
122 socket_->SetSendBufferSize(size, callback); | |
123 } | |
124 | |
125 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size, | |
126 const ErrorCallback& callback) { | |
127 socket_->SetReceiveBufferSize(size, callback); | |
128 } | |
129 | |
130 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) { | |
131 if (receive_queue_.empty()) { | |
132 receive_requests_.push(callback); | |
133 return false; | |
134 } | |
135 | |
136 ReceivedData* data = receive_queue_.front(); | |
137 receive_queue_.pop(); | |
138 socket_->ReceiveMore(1); | |
139 callback.Run(std::move(data->result), std::move(data->src_addr), | |
140 std::move(data->data)); | |
141 delete data; | |
142 return true; | |
143 } | |
144 | |
145 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr, | |
146 Array<uint8_t> data, | |
147 const ErrorCallback& callback) { | |
148 if (current_pending_sends_ >= max_pending_sends_) { | |
149 SendRequest* request = new SendRequest(); | |
150 request->dest_addr = std::move(dest_addr); | |
151 request->data = std::move(data); | |
152 request->callback = callback; | |
153 send_requests_.push(request); | |
154 return; | |
155 } | |
156 | |
157 DCHECK(send_requests_.empty()); | |
158 current_pending_sends_++; | |
159 socket_->SendTo(std::move(dest_addr), std::move(data), | |
160 ErrorCallback(static_cast<ErrorCallback::Runnable*>( | |
161 new SendCallbackHandler(this, callback)))); | |
162 } | |
163 | |
164 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result, | |
165 NetAddressPtr src_addr, | |
166 Array<uint8_t> data) { | |
167 if (!receive_requests_.empty()) { | |
168 // The cache should be empty if there are user requests waiting for data. | |
169 DCHECK(receive_queue_.empty()); | |
170 | |
171 socket_->ReceiveMore(1); | |
172 | |
173 ReceiveCallback callback = receive_requests_.front(); | |
174 receive_requests_.pop(); | |
175 | |
176 callback.Run(std::move(result), std::move(src_addr), std::move(data)); | |
177 return; | |
178 } | |
179 | |
180 DCHECK(receive_queue_.size() < max_receive_queue_size_); | |
181 ReceivedData* received_data = new ReceivedData(); | |
182 received_data->result = std::move(result); | |
183 received_data->src_addr = std::move(src_addr); | |
184 received_data->data = std::move(data); | |
185 receive_queue_.push(received_data); | |
186 } | |
187 | |
188 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) { | |
189 socket_->NegotiateMaxPendingSendRequests( | |
190 requested_max_pending_sends, | |
191 Callback<void(uint32_t)>( | |
192 static_cast< Callback<void(uint32_t)>::Runnable*>( | |
193 new NegotiateCallbackHandler(this)))); | |
194 } | |
195 | |
196 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted( | |
197 uint32_t actual_size) { | |
198 DCHECK(max_pending_sends_ == 1); | |
199 | |
200 if (actual_size == 0) { | |
201 assert(false); | |
202 return; | |
203 } | |
204 | |
205 max_pending_sends_ = actual_size; | |
206 | |
207 while (ProcessNextSendRequest()); | |
208 } | |
209 | |
210 void UDPSocketWrapper::OnSendToCompleted( | |
211 NetworkErrorPtr result, | |
212 const ErrorCallback& forward_callback) { | |
213 current_pending_sends_--; | |
214 ProcessNextSendRequest(); | |
215 | |
216 forward_callback.Run(std::move(result)); | |
217 } | |
218 | |
219 bool UDPSocketWrapper::ProcessNextSendRequest() { | |
220 if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty()) | |
221 return false; | |
222 | |
223 SendRequest* request = send_requests_.front(); | |
224 send_requests_.pop(); | |
225 | |
226 current_pending_sends_++; | |
227 | |
228 socket_->SendTo(std::move(request->dest_addr), std::move(request->data), | |
229 ErrorCallback(static_cast<ErrorCallback::Runnable*>( | |
230 new SendCallbackHandler(this, request->callback)))); | |
231 | |
232 delete request; | |
233 | |
234 return true; | |
235 } | |
236 | |
237 void UDPSocketWrapper::StartReceivingData( | |
238 InterfaceRequest<UDPSocketReceiver> request) { | |
239 binding_.Bind(std::move(request)); | |
240 socket_->ReceiveMore(max_receive_queue_size_); | |
241 } | |
242 | |
243 } // namespace mojo | |
OLD | NEW |