Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(288)

Side by Side Diff: mojo/services/public/cpp/network/udp_socket_wrapper.cc

Issue 789243002: Restructure public side of network service. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Rebase Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/public/cpp/network/udp_socket_wrapper.h"
6
7 #include <assert.h>
8
9 #include "mojo/public/cpp/environment/logging.h"
10
11 namespace mojo {
12 namespace {
13
14 const uint32_t kDefaultReceiveQueueSlots = 32;
15
16 } // namespace
17
18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler(
19 UDPSocketWrapper* delegate)
20 : delegate_(delegate) {
21 }
22
23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {}
24
25 void UDPSocketWrapper::NegotiateCallbackHandler::Run(
26 uint32_t actual_size) const {
27 delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size);
28 }
29
30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler(
31 UDPSocketWrapper* delegate,
32 const ErrorCallback& forward_callback)
33 : delegate_(delegate),
34 forward_callback_(forward_callback) {
35 }
36
37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {}
38
39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const {
40 delegate_->OnSendToCompleted(result.Pass(), forward_callback_);
41 }
42
43 UDPSocketWrapper::ReceivedData::ReceivedData() {}
44 UDPSocketWrapper::ReceivedData::~ReceivedData() {}
45
46 UDPSocketWrapper::SendRequest::SendRequest() {}
47 UDPSocketWrapper::SendRequest::~SendRequest() {}
48
49 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket)
50 : socket_(socket.Pass()),
51 max_receive_queue_size_(kDefaultReceiveQueueSlots),
52 max_pending_sends_(1),
53 current_pending_sends_(0) {
54 Initialize(0);
55 }
56
57 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket,
58 uint32_t receive_queue_slots,
59 uint32_t requested_max_pending_sends)
60 : socket_(socket.Pass()),
61 max_receive_queue_size_(receive_queue_slots),
62 max_pending_sends_(1),
63 current_pending_sends_(0) {
64 Initialize(requested_max_pending_sends);
65 }
66
67 UDPSocketWrapper::~UDPSocketWrapper() {
68 while (!receive_queue_.empty()) {
69 delete receive_queue_.front();
70 receive_queue_.pop();
71 }
72 while (!send_requests_.empty()) {
73 delete send_requests_.front();
74 send_requests_.pop();
75 }
76 }
77
78 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) {
79 socket_->AllowAddressReuse(callback);
80 }
81
82 void UDPSocketWrapper::Bind(
83 NetAddressPtr addr,
84 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
85 socket_->Bind(addr.Pass(), callback);
86 }
87
88 void UDPSocketWrapper::SetSendBufferSize(uint32_t size,
89 const ErrorCallback& callback) {
90 socket_->SetSendBufferSize(size, callback);
91 }
92
93 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size,
94 const ErrorCallback& callback) {
95 socket_->SetReceiveBufferSize(size, callback);
96 }
97
98 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) {
99 if (receive_queue_.empty()) {
100 receive_requests_.push(callback);
101 return false;
102 }
103
104 ReceivedData* data = receive_queue_.front();
105 receive_queue_.pop();
106 socket_->ReceiveMore(1);
107 callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass());
108 delete data;
109 return true;
110 }
111
112 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr,
113 Array<uint8_t> data,
114 const ErrorCallback& callback) {
115 if (current_pending_sends_ >= max_pending_sends_) {
116 SendRequest* request = new SendRequest();
117 request->dest_addr = dest_addr.Pass();
118 request->data = data.Pass();
119 request->callback = callback;
120 send_requests_.push(request);
121 return;
122 }
123
124 MOJO_DCHECK(send_requests_.empty());
125 current_pending_sends_++;
126 socket_->SendTo(dest_addr.Pass(), data.Pass(),
127 ErrorCallback(static_cast<ErrorCallback::Runnable*>(
128 new SendCallbackHandler(this, callback))));
129 }
130
131 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result,
132 NetAddressPtr src_addr,
133 Array<uint8_t> data) {
134 if (!receive_requests_.empty()) {
135 // The cache should be empty if there are user requests waiting for data.
136 MOJO_DCHECK(receive_queue_.empty());
137
138 socket_->ReceiveMore(1);
139
140 ReceiveCallback callback = receive_requests_.front();
141 receive_requests_.pop();
142
143 callback.Run(result.Pass(), src_addr.Pass(), data.Pass());
144 return;
145 }
146
147 MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_);
148 ReceivedData* received_data = new ReceivedData();
149 received_data->result = result.Pass();
150 received_data->src_addr = src_addr.Pass();
151 received_data->data = data.Pass();
152 receive_queue_.push(received_data);
153 }
154
155 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) {
156 socket_.set_client(this);
157 socket_->NegotiateMaxPendingSendRequests(
158 requested_max_pending_sends,
159 Callback<void(uint32_t)>(
160 static_cast< Callback<void(uint32_t)>::Runnable*>(
161 new NegotiateCallbackHandler(this))));
162 socket_->ReceiveMore(max_receive_queue_size_);
163 }
164
165 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
166 uint32_t actual_size) {
167 MOJO_DCHECK(max_pending_sends_ == 1);
168
169 if (actual_size == 0) {
170 assert(false);
171 return;
172 }
173
174 max_pending_sends_ = actual_size;
175
176 while (ProcessNextSendRequest());
177 }
178
179 void UDPSocketWrapper::OnSendToCompleted(
180 NetworkErrorPtr result,
181 const ErrorCallback& forward_callback) {
182 current_pending_sends_--;
183 ProcessNextSendRequest();
184
185 forward_callback.Run(result.Pass());
186 }
187
188 bool UDPSocketWrapper::ProcessNextSendRequest() {
189 if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty())
190 return false;
191
192 SendRequest* request = send_requests_.front();
193 send_requests_.pop();
194
195 current_pending_sends_++;
196
197 socket_->SendTo(
198 request->dest_addr.Pass(), request->data.Pass(),
199 ErrorCallback(static_cast<ErrorCallback::Runnable*>(
200 new SendCallbackHandler(this, request->callback))));
201
202 delete request;
203
204 return true;
205 }
206
207 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/public/cpp/network/udp_socket_wrapper.h ('k') | mojo/services/public/cpp/network/web_socket_read_queue.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698