Index: mojo/services/public/cpp/network/udp_socket_wrapper.cc |
diff --git a/mojo/services/public/cpp/network/udp_socket_wrapper.cc b/mojo/services/public/cpp/network/udp_socket_wrapper.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3a7f39b518c670f2ab80305a1ee57acab0add195 |
--- /dev/null |
+++ b/mojo/services/public/cpp/network/udp_socket_wrapper.cc |
@@ -0,0 +1,207 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/services/public/cpp/network/udp_socket_wrapper.h" |
+ |
+#include <assert.h> |
+ |
+#include "mojo/public/cpp/environment/logging.h" |
+ |
+namespace mojo { |
+namespace { |
+ |
+const uint32_t kDefaultReceiveQueueSlots = 32; |
+ |
+} // namespace |
+ |
+UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler( |
+ UDPSocketWrapper* delegate) |
+ : delegate_(delegate) { |
+} |
+ |
+UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {} |
+ |
+void UDPSocketWrapper::NegotiateCallbackHandler::Run( |
+ uint32_t actual_size) const { |
+ delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size); |
+} |
+ |
+UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler( |
+ UDPSocketWrapper* delegate, |
+ const ErrorCallback& forward_callback) |
+ : delegate_(delegate), |
+ forward_callback_(forward_callback) { |
+} |
+ |
+UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {} |
+ |
+void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const { |
+ delegate_->OnSendToCompleted(result.Pass(), forward_callback_); |
+} |
+ |
+UDPSocketWrapper::ReceivedData::ReceivedData() {} |
+UDPSocketWrapper::ReceivedData::~ReceivedData() {} |
+ |
+UDPSocketWrapper::SendRequest::SendRequest() {} |
+UDPSocketWrapper::SendRequest::~SendRequest() {} |
+ |
+UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket) |
+ : socket_(socket.Pass()), |
+ max_receive_queue_size_(kDefaultReceiveQueueSlots), |
+ max_pending_sends_(1), |
+ current_pending_sends_(0) { |
+ Initialize(0); |
+} |
+ |
+UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket, |
+ uint32_t receive_queue_slots, |
+ uint32_t requested_max_pending_sends) |
+ : socket_(socket.Pass()), |
+ max_receive_queue_size_(receive_queue_slots), |
+ max_pending_sends_(1), |
+ current_pending_sends_(0) { |
+ Initialize(requested_max_pending_sends); |
+} |
+ |
+UDPSocketWrapper::~UDPSocketWrapper() { |
+ while (!receive_queue_.empty()) { |
+ delete receive_queue_.front(); |
+ receive_queue_.pop(); |
+ } |
+ while (!send_requests_.empty()) { |
+ delete send_requests_.front(); |
+ send_requests_.pop(); |
+ } |
+} |
+ |
+void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) { |
+ socket_->AllowAddressReuse(callback); |
+} |
+ |
+void UDPSocketWrapper::Bind( |
+ NetAddressPtr addr, |
+ const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) { |
+ socket_->Bind(addr.Pass(), callback); |
+} |
+ |
+void UDPSocketWrapper::SetSendBufferSize(uint32_t size, |
+ const ErrorCallback& callback) { |
+ socket_->SetSendBufferSize(size, callback); |
+} |
+ |
+void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size, |
+ const ErrorCallback& callback) { |
+ socket_->SetReceiveBufferSize(size, callback); |
+} |
+ |
+bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) { |
+ if (receive_queue_.empty()) { |
+ receive_requests_.push(callback); |
+ return false; |
+ } |
+ |
+ ReceivedData* data = receive_queue_.front(); |
+ receive_queue_.pop(); |
+ socket_->ReceiveMore(1); |
+ callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass()); |
+ delete data; |
+ return true; |
+} |
+ |
+void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr, |
+ Array<uint8_t> data, |
+ const ErrorCallback& callback) { |
+ if (current_pending_sends_ >= max_pending_sends_) { |
+ SendRequest* request = new SendRequest(); |
+ request->dest_addr = dest_addr.Pass(); |
+ request->data = data.Pass(); |
+ request->callback = callback; |
+ send_requests_.push(request); |
+ return; |
+ } |
+ |
+ MOJO_DCHECK(send_requests_.empty()); |
+ current_pending_sends_++; |
+ socket_->SendTo(dest_addr.Pass(), data.Pass(), |
+ ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
+ new SendCallbackHandler(this, callback)))); |
+} |
+ |
+void UDPSocketWrapper::OnReceived(NetworkErrorPtr result, |
+ NetAddressPtr src_addr, |
+ Array<uint8_t> data) { |
+ if (!receive_requests_.empty()) { |
+ // The cache should be empty if there are user requests waiting for data. |
+ MOJO_DCHECK(receive_queue_.empty()); |
+ |
+ socket_->ReceiveMore(1); |
+ |
+ ReceiveCallback callback = receive_requests_.front(); |
+ receive_requests_.pop(); |
+ |
+ callback.Run(result.Pass(), src_addr.Pass(), data.Pass()); |
+ return; |
+ } |
+ |
+ MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_); |
+ ReceivedData* received_data = new ReceivedData(); |
+ received_data->result = result.Pass(); |
+ received_data->src_addr = src_addr.Pass(); |
+ received_data->data = data.Pass(); |
+ receive_queue_.push(received_data); |
+} |
+ |
+void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) { |
+ socket_.set_client(this); |
+ socket_->NegotiateMaxPendingSendRequests( |
+ requested_max_pending_sends, |
+ Callback<void(uint32_t)>( |
+ static_cast<typename Callback<void(uint32_t)>::Runnable*>( |
+ new NegotiateCallbackHandler(this)))); |
+ socket_->ReceiveMore(max_receive_queue_size_); |
+} |
+ |
+void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted( |
+ uint32_t actual_size) { |
+ MOJO_DCHECK(max_pending_sends_ == 1); |
+ |
+ if (actual_size == 0) { |
+ assert(false); |
+ return; |
+ } |
+ |
+ max_pending_sends_ = actual_size; |
+ |
+ while (ProcessNextSendRequest()); |
+} |
+ |
+void UDPSocketWrapper::OnSendToCompleted( |
+ NetworkErrorPtr result, |
+ const ErrorCallback& forward_callback) { |
+ current_pending_sends_--; |
+ ProcessNextSendRequest(); |
+ |
+ forward_callback.Run(result.Pass()); |
+} |
+ |
+bool UDPSocketWrapper::ProcessNextSendRequest() { |
+ if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty()) |
+ return false; |
+ |
+ SendRequest* request = send_requests_.front(); |
+ send_requests_.pop(); |
+ |
+ current_pending_sends_++; |
+ |
+ socket_->SendTo( |
+ request->dest_addr.Pass(), request->data.Pass(), |
+ ErrorCallback(static_cast<typename ErrorCallback::Runnable*>( |
+ new SendCallbackHandler(this, request->callback)))); |
+ |
+ delete request; |
+ |
+ return true; |
+} |
+ |
+} // namespace mojo |