Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Unified Diff: mojo/services/public/cpp/network/udp_socket_wrapper.cc

Issue 668993002: Mojo UDP: add a client-side wrapper which handles the client<->server buffering for users. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/services/public/cpp/network/udp_socket_wrapper.cc
diff --git a/mojo/services/public/cpp/network/udp_socket_wrapper.cc b/mojo/services/public/cpp/network/udp_socket_wrapper.cc
new file mode 100644
index 0000000000000000000000000000000000000000..3a7f39b518c670f2ab80305a1ee57acab0add195
--- /dev/null
+++ b/mojo/services/public/cpp/network/udp_socket_wrapper.cc
@@ -0,0 +1,207 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/services/public/cpp/network/udp_socket_wrapper.h"
+
+#include <assert.h>
+
+#include "mojo/public/cpp/environment/logging.h"
+
+namespace mojo {
+namespace {
+
+const uint32_t kDefaultReceiveQueueSlots = 32;
+
+} // namespace
+
+UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler(
+ UDPSocketWrapper* delegate)
+ : delegate_(delegate) {
+}
+
+UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {}
+
+void UDPSocketWrapper::NegotiateCallbackHandler::Run(
+ uint32_t actual_size) const {
+ delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size);
+}
+
+UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler(
+ UDPSocketWrapper* delegate,
+ const ErrorCallback& forward_callback)
+ : delegate_(delegate),
+ forward_callback_(forward_callback) {
+}
+
+UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {}
+
+void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const {
+ delegate_->OnSendToCompleted(result.Pass(), forward_callback_);
+}
+
+UDPSocketWrapper::ReceivedData::ReceivedData() {}
+UDPSocketWrapper::ReceivedData::~ReceivedData() {}
+
+UDPSocketWrapper::SendRequest::SendRequest() {}
+UDPSocketWrapper::SendRequest::~SendRequest() {}
+
+UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket)
+ : socket_(socket.Pass()),
+ max_receive_queue_size_(kDefaultReceiveQueueSlots),
+ max_pending_sends_(1),
+ current_pending_sends_(0) {
+ Initialize(0);
+}
+
+UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket,
+ uint32_t receive_queue_slots,
+ uint32_t requested_max_pending_sends)
+ : socket_(socket.Pass()),
+ max_receive_queue_size_(receive_queue_slots),
+ max_pending_sends_(1),
+ current_pending_sends_(0) {
+ Initialize(requested_max_pending_sends);
+}
+
+UDPSocketWrapper::~UDPSocketWrapper() {
+ while (!receive_queue_.empty()) {
+ delete receive_queue_.front();
+ receive_queue_.pop();
+ }
+ while (!send_requests_.empty()) {
+ delete send_requests_.front();
+ send_requests_.pop();
+ }
+}
+
+void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) {
+ socket_->AllowAddressReuse(callback);
+}
+
+void UDPSocketWrapper::Bind(
+ NetAddressPtr addr,
+ const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
+ socket_->Bind(addr.Pass(), callback);
+}
+
+void UDPSocketWrapper::SetSendBufferSize(uint32_t size,
+ const ErrorCallback& callback) {
+ socket_->SetSendBufferSize(size, callback);
+}
+
+void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size,
+ const ErrorCallback& callback) {
+ socket_->SetReceiveBufferSize(size, callback);
+}
+
+bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) {
+ if (receive_queue_.empty()) {
+ receive_requests_.push(callback);
+ return false;
+ }
+
+ ReceivedData* data = receive_queue_.front();
+ receive_queue_.pop();
+ socket_->ReceiveMore(1);
+ callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass());
+ delete data;
+ return true;
+}
+
+void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr,
+ Array<uint8_t> data,
+ const ErrorCallback& callback) {
+ if (current_pending_sends_ >= max_pending_sends_) {
+ SendRequest* request = new SendRequest();
+ request->dest_addr = dest_addr.Pass();
+ request->data = data.Pass();
+ request->callback = callback;
+ send_requests_.push(request);
+ return;
+ }
+
+ MOJO_DCHECK(send_requests_.empty());
+ current_pending_sends_++;
+ socket_->SendTo(dest_addr.Pass(), data.Pass(),
+ ErrorCallback(static_cast<typename ErrorCallback::Runnable*>(
+ new SendCallbackHandler(this, callback))));
+}
+
+void UDPSocketWrapper::OnReceived(NetworkErrorPtr result,
+ NetAddressPtr src_addr,
+ Array<uint8_t> data) {
+ if (!receive_requests_.empty()) {
+ // The cache should be empty if there are user requests waiting for data.
+ MOJO_DCHECK(receive_queue_.empty());
+
+ socket_->ReceiveMore(1);
+
+ ReceiveCallback callback = receive_requests_.front();
+ receive_requests_.pop();
+
+ callback.Run(result.Pass(), src_addr.Pass(), data.Pass());
+ return;
+ }
+
+ MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_);
+ ReceivedData* received_data = new ReceivedData();
+ received_data->result = result.Pass();
+ received_data->src_addr = src_addr.Pass();
+ received_data->data = data.Pass();
+ receive_queue_.push(received_data);
+}
+
+void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) {
+ socket_.set_client(this);
+ socket_->NegotiateMaxPendingSendRequests(
+ requested_max_pending_sends,
+ Callback<void(uint32_t)>(
+ static_cast<typename Callback<void(uint32_t)>::Runnable*>(
+ new NegotiateCallbackHandler(this))));
+ socket_->ReceiveMore(max_receive_queue_size_);
+}
+
+void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
+ uint32_t actual_size) {
+ MOJO_DCHECK(max_pending_sends_ == 1);
+
+ if (actual_size == 0) {
+ assert(false);
+ return;
+ }
+
+ max_pending_sends_ = actual_size;
+
+ while (ProcessNextSendRequest());
+}
+
+void UDPSocketWrapper::OnSendToCompleted(
+ NetworkErrorPtr result,
+ const ErrorCallback& forward_callback) {
+ current_pending_sends_--;
+ ProcessNextSendRequest();
+
+ forward_callback.Run(result.Pass());
+}
+
+bool UDPSocketWrapper::ProcessNextSendRequest() {
+ if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty())
+ return false;
+
+ SendRequest* request = send_requests_.front();
+ send_requests_.pop();
+
+ current_pending_sends_++;
+
+ socket_->SendTo(
+ request->dest_addr.Pass(), request->data.Pass(),
+ ErrorCallback(static_cast<typename ErrorCallback::Runnable*>(
+ new SendCallbackHandler(this, request->callback))));
+
+ delete request;
+
+ return true;
+}
+
+} // namespace mojo
« no previous file with comments | « mojo/services/public/cpp/network/udp_socket_wrapper.h ('k') | mojo/services/public/interfaces/network/udp_socket.mojom » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698