OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/services/public/cpp/network/udp_socket_wrapper.h" | |
6 | |
7 #include <assert.h> | |
8 | |
9 #include "mojo/public/cpp/environment/logging.h" | |
10 | |
11 namespace mojo { | |
12 namespace { | |
13 | |
14 const uint32_t kDefaultReceiveQueueSlots = 32; | |
15 | |
16 } // namespace | |
17 | |
18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler( | |
19 UDPSocketWrapper* delegate) | |
20 : delegate_(delegate) { | |
21 } | |
22 | |
23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {} | |
24 | |
25 void UDPSocketWrapper::NegotiateCallbackHandler::Run( | |
26 uint32_t actual_size) const { | |
27 delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size); | |
28 } | |
29 | |
30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler( | |
31 UDPSocketWrapper* delegate, | |
32 const ErrorCallback& forward_callback) | |
33 : delegate_(delegate), | |
34 forward_callback_(forward_callback) { | |
35 } | |
36 | |
37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {} | |
38 | |
39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const { | |
40 delegate_->OnSendToCompleted(result.Pass(), forward_callback_); | |
41 } | |
42 | |
43 UDPSocketWrapper::ReceivedData::ReceivedData() {} | |
44 UDPSocketWrapper::ReceivedData::~ReceivedData() {} | |
45 | |
46 UDPSocketWrapper::SendRequest::SendRequest() {} | |
47 UDPSocketWrapper::SendRequest::~SendRequest() {} | |
48 | |
49 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket) | |
50 : socket_(socket.Pass()), | |
51 max_receive_queue_size_(kDefaultReceiveQueueSlots), | |
52 max_pending_sends_(1), | |
53 current_pending_sends_(0) { | |
54 Initialize(0); | |
55 } | |
56 | |
57 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket, | |
58 uint32_t receive_queue_slots, | |
59 uint32_t requested_max_pending_sends) | |
60 : socket_(socket.Pass()), | |
61 max_receive_queue_size_(receive_queue_slots), | |
62 max_pending_sends_(1), | |
63 current_pending_sends_(0) { | |
64 Initialize(requested_max_pending_sends); | |
65 } | |
66 | |
67 UDPSocketWrapper::~UDPSocketWrapper() { | |
68 while (!receive_queue_.empty()) { | |
69 delete receive_queue_.front(); | |
70 receive_queue_.pop(); | |
71 } | |
72 while (!send_requests_.empty()) { | |
73 delete send_requests_.front(); | |
74 send_requests_.pop(); | |
75 } | |
76 } | |
77 | |
78 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) { | |
79 socket_->AllowAddressReuse(callback); | |
80 } | |
81 | |
82 void UDPSocketWrapper::Bind( | |
83 NetAddressPtr addr, | |
84 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { | |
85 socket_->Bind(addr.Pass(), callback); | |
86 } | |
87 | |
88 void UDPSocketWrapper::SetSendBufferSize(uint32_t size, | |
89 const ErrorCallback& callback) { | |
90 socket_->SetSendBufferSize(size, callback); | |
91 } | |
92 | |
93 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size, | |
94 const ErrorCallback& callback) { | |
95 socket_->SetReceiveBufferSize(size, callback); | |
96 } | |
97 | |
98 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) { | |
99 if (receive_queue_.empty()) { | |
100 receive_requests_.push(callback); | |
101 return false; | |
102 } | |
103 | |
104 ReceivedData* data = receive_queue_.front(); | |
105 receive_queue_.pop(); | |
106 socket_->ReceiveMore(1); | |
107 callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass()); | |
108 delete data; | |
109 return true; | |
110 } | |
111 | |
112 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr, | |
113 Array<uint8_t> data, | |
114 const ErrorCallback& callback) { | |
115 if (current_pending_sends_ >= max_pending_sends_) { | |
116 SendRequest* request = new SendRequest(); | |
117 request->dest_addr = dest_addr.Pass(); | |
118 request->data = data.Pass(); | |
119 request->callback = callback; | |
120 send_requests_.push(request); | |
121 return; | |
122 } | |
123 | |
124 MOJO_DCHECK(send_requests_.empty()); | |
125 current_pending_sends_++; | |
126 socket_->SendTo(dest_addr.Pass(), data.Pass(), | |
127 ErrorCallback(static_cast<ErrorCallback::Runnable*>( | |
128 new SendCallbackHandler(this, callback)))); | |
129 } | |
130 | |
131 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result, | |
132 NetAddressPtr src_addr, | |
133 Array<uint8_t> data) { | |
134 if (!receive_requests_.empty()) { | |
135 // The cache should be empty if there are user requests waiting for data. | |
136 MOJO_DCHECK(receive_queue_.empty()); | |
137 | |
138 socket_->ReceiveMore(1); | |
139 | |
140 ReceiveCallback callback = receive_requests_.front(); | |
141 receive_requests_.pop(); | |
142 | |
143 callback.Run(result.Pass(), src_addr.Pass(), data.Pass()); | |
144 return; | |
145 } | |
146 | |
147 MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_); | |
148 ReceivedData* received_data = new ReceivedData(); | |
149 received_data->result = result.Pass(); | |
150 received_data->src_addr = src_addr.Pass(); | |
151 received_data->data = data.Pass(); | |
152 receive_queue_.push(received_data); | |
153 } | |
154 | |
155 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) { | |
156 socket_.set_client(this); | |
157 socket_->NegotiateMaxPendingSendRequests( | |
158 requested_max_pending_sends, | |
159 Callback<void(uint32_t)>( | |
160 static_cast< Callback<void(uint32_t)>::Runnable*>( | |
161 new NegotiateCallbackHandler(this)))); | |
162 socket_->ReceiveMore(max_receive_queue_size_); | |
163 } | |
164 | |
165 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted( | |
166 uint32_t actual_size) { | |
167 MOJO_DCHECK(max_pending_sends_ == 1); | |
168 | |
169 if (actual_size == 0) { | |
170 assert(false); | |
171 return; | |
172 } | |
173 | |
174 max_pending_sends_ = actual_size; | |
175 | |
176 while (ProcessNextSendRequest()); | |
177 } | |
178 | |
179 void UDPSocketWrapper::OnSendToCompleted( | |
180 NetworkErrorPtr result, | |
181 const ErrorCallback& forward_callback) { | |
182 current_pending_sends_--; | |
183 ProcessNextSendRequest(); | |
184 | |
185 forward_callback.Run(result.Pass()); | |
186 } | |
187 | |
188 bool UDPSocketWrapper::ProcessNextSendRequest() { | |
189 if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty()) | |
190 return false; | |
191 | |
192 SendRequest* request = send_requests_.front(); | |
193 send_requests_.pop(); | |
194 | |
195 current_pending_sends_++; | |
196 | |
197 socket_->SendTo( | |
198 request->dest_addr.Pass(), request->data.Pass(), | |
199 ErrorCallback(static_cast<ErrorCallback::Runnable*>( | |
200 new SendCallbackHandler(this, request->callback)))); | |
201 | |
202 delete request; | |
203 | |
204 return true; | |
205 } | |
206 | |
207 } // namespace mojo | |
OLD | NEW |