OLD | NEW |
(Empty) | |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/services/public/cpp/network/udp_socket_wrapper.h" |
| 6 |
| 7 #include <assert.h> |
| 8 |
| 9 #include "mojo/public/cpp/environment/logging.h" |
| 10 |
| 11 namespace mojo { |
| 12 namespace { |
| 13 |
| 14 const uint32_t kDefaultReceiveQueueSlots = 32; |
| 15 |
| 16 } // namespace |
| 17 |
| 18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler( |
| 19 UDPSocketWrapper* delegate) |
| 20 : delegate_(delegate) { |
| 21 } |
| 22 |
| 23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {} |
| 24 |
| 25 void UDPSocketWrapper::NegotiateCallbackHandler::Run( |
| 26 uint32_t actual_size) const { |
| 27 delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size); |
| 28 } |
| 29 |
| 30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler( |
| 31 UDPSocketWrapper* delegate, |
| 32 const ErrorCallback& forward_callback) |
| 33 : delegate_(delegate), |
| 34 forward_callback_(forward_callback) { |
| 35 } |
| 36 |
| 37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {} |
| 38 |
| 39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const { |
| 40 delegate_->OnSendToCompleted(result.Pass(), forward_callback_); |
| 41 } |
| 42 |
| 43 UDPSocketWrapper::ReceivedData::ReceivedData() {} |
| 44 UDPSocketWrapper::ReceivedData::~ReceivedData() {} |
| 45 |
| 46 UDPSocketWrapper::SendRequest::SendRequest() {} |
| 47 UDPSocketWrapper::SendRequest::~SendRequest() {} |
| 48 |
| 49 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket) |
| 50 : socket_(socket.Pass()), |
| 51 max_receive_queue_size_(kDefaultReceiveQueueSlots), |
| 52 max_pending_sends_(1), |
| 53 current_pending_sends_(0) { |
| 54 Initialize(0); |
| 55 } |
| 56 |
| 57 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket, |
| 58 uint32_t receive_queue_slots, |
| 59 uint32_t requested_max_pending_sends) |
| 60 : socket_(socket.Pass()), |
| 61 max_receive_queue_size_(receive_queue_slots), |
| 62 max_pending_sends_(1), |
| 63 current_pending_sends_(0) { |
| 64 Initialize(requested_max_pending_sends); |
| 65 } |
| 66 |
| 67 UDPSocketWrapper::~UDPSocketWrapper() { |
| 68 while (!receive_queue_.empty()) { |
| 69 delete receive_queue_.front(); |
| 70 receive_queue_.pop(); |
| 71 } |
| 72 while (!send_requests_.empty()) { |
| 73 delete send_requests_.front(); |
| 74 send_requests_.pop(); |
| 75 } |
| 76 } |
| 77 |
| 78 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) { |
| 79 socket_->AllowAddressReuse(callback); |
| 80 } |
| 81 |
| 82 void UDPSocketWrapper::Bind( |
| 83 NetAddressPtr addr, |
| 84 const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { |
| 85 socket_->Bind(addr.Pass(), callback); |
| 86 } |
| 87 |
| 88 void UDPSocketWrapper::SetSendBufferSize(uint32_t size, |
| 89 const ErrorCallback& callback) { |
| 90 socket_->SetSendBufferSize(size, callback); |
| 91 } |
| 92 |
| 93 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size, |
| 94 const ErrorCallback& callback) { |
| 95 socket_->SetReceiveBufferSize(size, callback); |
| 96 } |
| 97 |
| 98 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) { |
| 99 if (receive_queue_.empty()) { |
| 100 receive_requests_.push(callback); |
| 101 return false; |
| 102 } |
| 103 |
| 104 ReceivedData* data = receive_queue_.front(); |
| 105 receive_queue_.pop(); |
| 106 socket_->ReceiveMore(1); |
| 107 callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass()); |
| 108 delete data; |
| 109 return true; |
| 110 } |
| 111 |
| 112 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr, |
| 113 Array<uint8_t> data, |
| 114 const ErrorCallback& callback) { |
| 115 if (current_pending_sends_ >= max_pending_sends_) { |
| 116 SendRequest* request = new SendRequest(); |
| 117 request->dest_addr = dest_addr.Pass(); |
| 118 request->data = data.Pass(); |
| 119 request->callback = callback; |
| 120 send_requests_.push(request); |
| 121 return; |
| 122 } |
| 123 |
| 124 MOJO_DCHECK(send_requests_.empty()); |
| 125 current_pending_sends_++; |
| 126 socket_->SendTo(dest_addr.Pass(), data.Pass(), |
| 127 ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
| 128 new SendCallbackHandler(this, callback)))); |
| 129 } |
| 130 |
| 131 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result, |
| 132 NetAddressPtr src_addr, |
| 133 Array<uint8_t> data) { |
| 134 if (!receive_requests_.empty()) { |
| 135 // The cache should be empty if there are user requests waiting for data. |
| 136 MOJO_DCHECK(receive_queue_.empty()); |
| 137 |
| 138 socket_->ReceiveMore(1); |
| 139 |
| 140 ReceiveCallback callback = receive_requests_.front(); |
| 141 receive_requests_.pop(); |
| 142 |
| 143 callback.Run(result.Pass(), src_addr.Pass(), data.Pass()); |
| 144 return; |
| 145 } |
| 146 |
| 147 MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_); |
| 148 ReceivedData* received_data = new ReceivedData(); |
| 149 received_data->result = result.Pass(); |
| 150 received_data->src_addr = src_addr.Pass(); |
| 151 received_data->data = data.Pass(); |
| 152 receive_queue_.push(received_data); |
| 153 } |
| 154 |
| 155 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) { |
| 156 socket_.set_client(this); |
| 157 socket_->NegotiateMaxPendingSendRequests( |
| 158 requested_max_pending_sends, |
| 159 Callback<void(uint32_t)>( |
| 160 static_cast<typename Callback<void(uint32_t)>::Runnable*>( |
| 161 new NegotiateCallbackHandler(this)))); |
| 162 socket_->ReceiveMore(max_receive_queue_size_); |
| 163 } |
| 164 |
| 165 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted( |
| 166 uint32_t actual_size) { |
| 167 MOJO_DCHECK(max_pending_sends_ == 1); |
| 168 |
| 169 if (actual_size == 0) { |
| 170 assert(false); |
| 171 return; |
| 172 } |
| 173 |
| 174 max_pending_sends_ = actual_size; |
| 175 |
| 176 while (ProcessNextSendRequest()); |
| 177 } |
| 178 |
| 179 void UDPSocketWrapper::OnSendToCompleted( |
| 180 NetworkErrorPtr result, |
| 181 const ErrorCallback& forward_callback) { |
| 182 current_pending_sends_--; |
| 183 ProcessNextSendRequest(); |
| 184 |
| 185 forward_callback.Run(result.Pass()); |
| 186 } |
| 187 |
| 188 bool UDPSocketWrapper::ProcessNextSendRequest() { |
| 189 if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty()) |
| 190 return false; |
| 191 |
| 192 SendRequest* request = send_requests_.front(); |
| 193 send_requests_.pop(); |
| 194 |
| 195 current_pending_sends_++; |
| 196 |
| 197 socket_->SendTo( |
| 198 request->dest_addr.Pass(), request->data.Pass(), |
| 199 ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
| 200 new SendCallbackHandler(this, request->callback)))); |
| 201 |
| 202 delete request; |
| 203 |
| 204 return true; |
| 205 } |
| 206 |
| 207 } // namespace mojo |
OLD | NEW |