Index: ppapi/proxy/udp_socket_filter.cc |
diff --git a/ppapi/proxy/udp_socket_filter.cc b/ppapi/proxy/udp_socket_filter.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8d7ff9de3a7cf26c839d1740892dfeaf5077a3e6 |
--- /dev/null |
+++ b/ppapi/proxy/udp_socket_filter.cc |
@@ -0,0 +1,245 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "ppapi/proxy/udp_socket_filter.h" |
+ |
+#include <algorithm> |
+#include <cstring> |
+ |
+#include "base/logging.h" |
+#include "ppapi/c/pp_errors.h" |
+#include "ppapi/proxy/error_conversion.h" |
+#include "ppapi/proxy/plugin_globals.h" |
+#include "ppapi/proxy/ppapi_messages.h" |
+#include "ppapi/thunk/enter.h" |
+#include "ppapi/thunk/resource_creation_api.h" |
+ |
+namespace ppapi { |
+namespace proxy { |
+ |
+const int32_t UDPSocketFilter::kMaxReadSize = 128 * 1024; |
+const int32_t UDPSocketFilter::kMaxReceiveBufferSize = |
+ 1024 * UDPSocketFilter::kMaxReadSize; |
+const size_t UDPSocketFilter::kPluginReceiveBufferSlots = 32u; |
+ |
+namespace { |
+ |
+int32_t SetRecvFromOutput(PP_Instance pp_instance, |
+ const scoped_ptr<std::string>& data, |
+ const PP_NetAddress_Private& addr, |
+ char* output_buffer, |
+ int32_t num_bytes, |
+ PP_Resource* output_addr, |
+ int32_t browser_result) { |
+ ProxyLock::AssertAcquired(); |
+ DCHECK_GE(num_bytes, static_cast<int32_t>(data->size())); |
+ |
+ int32_t result = browser_result; |
+ if (result == PP_OK && output_addr) { |
+ thunk::EnterResourceCreationNoLock enter(pp_instance); |
+ if (enter.succeeded()) { |
+ *output_addr = enter.functions()->CreateNetAddressFromNetAddressPrivate( |
+ pp_instance, addr); |
+ } else { |
+ result = PP_ERROR_FAILED; |
+ } |
+ } |
+ |
+ if (result == PP_OK && !data->empty()) |
+ memcpy(output_buffer, data->c_str(), data->size()); |
+ |
+ return result == PP_OK ? static_cast<int32_t>(data->size()) : result; |
+} |
+ |
+} // namespace |
+ |
+UDPSocketFilter::UDPSocketFilter() { |
+} |
+ |
+UDPSocketFilter::~UDPSocketFilter() { |
+} |
+ |
+void UDPSocketFilter::AddUDPResource( |
+ PP_Instance instance, |
+ PP_Resource resource, |
+ bool private_api, |
+ const base::Closure& slot_available_callback) { |
+ ProxyLock::AssertAcquired(); |
+ base::AutoLock acquire(lock_); |
+ DCHECK(!queues_.contains(resource)); |
+ queues_.add(resource, scoped_ptr<RecvQueue>(new RecvQueue( |
+ instance, private_api, slot_available_callback))); |
+} |
+ |
+void UDPSocketFilter::RemoveUDPResource(PP_Resource resource) { |
+ ProxyLock::AssertAcquired(); |
+ base::AutoLock acquire(lock_); |
+ DCHECK(queues_.contains(resource)); |
+ queues_.erase(resource); |
+} |
+ |
+int32_t UDPSocketFilter::RequestData( |
+ PP_Resource resource, |
+ int32_t num_bytes, |
+ char* buffer, |
+ PP_Resource* addr, |
+ const scoped_refptr<TrackedCallback>& callback) { |
+ ProxyLock::AssertAcquired(); |
+ base::AutoLock acquire(lock_); |
+ RecvQueue* queue_ptr = queues_.get(resource); |
+ if (!queue_ptr) { |
+ NOTREACHED(); |
+ return PP_ERROR_FAILED; |
+ } |
+ return queue_ptr->RequestData(num_bytes, buffer, addr, callback); |
+} |
+ |
+bool UDPSocketFilter::OnResourceReplyReceived( |
+ const ResourceMessageReplyParams& params, |
+ const IPC::Message& nested_msg) { |
+ bool handled = true; |
+ PPAPI_BEGIN_MESSAGE_MAP(UDPSocketFilter, nested_msg) |
+ PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(PpapiPluginMsg_UDPSocket_PushRecvResult, |
+ OnPluginMsgPushRecvResult) |
+ PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL_UNHANDLED(handled = false) |
+ PPAPI_END_MESSAGE_MAP() |
+ return handled; |
+} |
+ |
+PP_NetAddress_Private UDPSocketFilter::GetLastAddrPrivate( |
+ PP_Resource resource) const { |
+ base::AutoLock acquire(lock_); |
+ return queues_.get(resource)->GetLastAddrPrivate(); |
+} |
+ |
+void UDPSocketFilter::OnPluginMsgPushRecvResult( |
+ const ResourceMessageReplyParams& params, |
+ int32_t result, |
+ const std::string& data, |
+ const PP_NetAddress_Private& addr) { |
+ DCHECK(PluginGlobals::Get()->ipc_task_runner()->RunsTasksOnCurrentThread()); |
+ base::AutoLock acquire(lock_); |
+ RecvQueue* queue_ptr = queues_.get(params.pp_resource()); |
+ // The RecvQueue might be gone if there were messages in-flight for a |
+ // resource that has been destroyed. |
+ if (queue_ptr) { |
+ // TODO(yzshen): Support passing in a non-const string ref, so that we can |
+ // eliminate one copy when storing the data in the buffer. |
+ queue_ptr->DataReceivedOnIOThread(result, data, addr); |
+ } |
+} |
+ |
+UDPSocketFilter::RecvQueue::RecvQueue( |
+ PP_Instance pp_instance, |
+ bool private_api, |
+ const base::Closure& slot_available_callback) |
+ : pp_instance_(pp_instance), |
+ read_buffer_(nullptr), |
+ bytes_to_read_(0), |
+ recvfrom_addr_resource_(nullptr), |
+ last_recvfrom_addr_(), |
+ private_api_(private_api), |
+ slot_available_callback_(slot_available_callback) { |
+} |
+ |
+UDPSocketFilter::RecvQueue::~RecvQueue() { |
+ if (TrackedCallback::IsPending(recvfrom_callback_)) |
+ recvfrom_callback_->PostAbort(); |
+} |
+ |
+void UDPSocketFilter::RecvQueue::DataReceivedOnIOThread( |
+ int32_t result, |
+ const std::string& data, |
+ const PP_NetAddress_Private& addr) { |
+ DCHECK(PluginGlobals::Get()->ipc_task_runner()->RunsTasksOnCurrentThread()); |
+ DCHECK_LT(recv_buffers_.size(), UDPSocketFilter::kPluginReceiveBufferSlots); |
+ |
+ if (!TrackedCallback::IsPending(recvfrom_callback_) || !read_buffer_) { |
+ recv_buffers_.push(RecvBuffer()); |
+ RecvBuffer& back = recv_buffers_.back(); |
+ back.result = result; |
+ back.data = data; |
+ back.addr = addr; |
+ return; |
+ } |
+ DCHECK_EQ(recv_buffers_.size(), 0u); |
+ |
+ if (bytes_to_read_ < static_cast<int32_t>(data.size())) { |
+ recv_buffers_.push(RecvBuffer()); |
+ RecvBuffer& back = recv_buffers_.back(); |
+ back.result = result; |
+ back.data = data; |
+ back.addr = addr; |
+ |
+ result = PP_ERROR_MESSAGE_TOO_BIG; |
+ } else { |
+ // Instead of calling SetRecvFromOutput directly, post it as a completion |
+ // task, so that: |
+ // 1) It can run with the ProxyLock (we can't lock it on the IO thread.) |
+ // 2) So that we only write to the output params in the case of success. |
+ // (Since the callback will complete on another thread, it's possible |
+ // that the resource will be deleted and abort the callback before it |
+ // is actually run.) |
+ scoped_ptr<std::string> data_to_pass(new std::string(data)); |
+ recvfrom_callback_->set_completion_task(base::Bind( |
+ &SetRecvFromOutput, pp_instance_, base::Passed(data_to_pass.Pass()), |
+ addr, base::Unretained(read_buffer_), bytes_to_read_, |
+ base::Unretained(recvfrom_addr_resource_))); |
+ last_recvfrom_addr_ = addr; |
+ PpapiGlobals::Get()->GetMainThreadMessageLoop()->PostTask( |
+ FROM_HERE, |
+ RunWhileLocked(slot_available_callback_)); |
+ } |
+ |
+ read_buffer_ = NULL; |
+ bytes_to_read_ = -1; |
+ recvfrom_addr_resource_ = NULL; |
+ |
+ recvfrom_callback_->Run( |
+ ConvertNetworkAPIErrorForCompatibility(result, private_api_)); |
+} |
+ |
+int32_t UDPSocketFilter::RecvQueue::RequestData( |
+ int32_t num_bytes, |
+ char* buffer_out, |
+ PP_Resource* addr_out, |
+ const scoped_refptr<TrackedCallback>& callback) { |
+ ProxyLock::AssertAcquired(); |
+ if (!buffer_out || num_bytes <= 0) |
+ return PP_ERROR_BADARGUMENT; |
+ if (TrackedCallback::IsPending(recvfrom_callback_)) |
+ return PP_ERROR_INPROGRESS; |
+ |
+ if (recv_buffers_.empty()) { |
+ read_buffer_ = buffer_out; |
+ bytes_to_read_ = std::min(num_bytes, UDPSocketFilter::kMaxReadSize); |
+ recvfrom_addr_resource_ = addr_out; |
+ recvfrom_callback_ = callback; |
+ return PP_OK_COMPLETIONPENDING; |
+ } else { |
+ RecvBuffer& front = recv_buffers_.front(); |
+ |
+ if (static_cast<size_t>(num_bytes) < front.data.size()) |
+ return PP_ERROR_MESSAGE_TOO_BIG; |
+ |
+ int32_t result = static_cast<int32_t>(front.data.size()); |
+ scoped_ptr<std::string> data_to_pass(new std::string); |
+ data_to_pass->swap(front.data); |
+ SetRecvFromOutput(pp_instance_, data_to_pass.Pass(), front.addr, buffer_out, |
+ num_bytes, addr_out, PP_OK); |
+ last_recvfrom_addr_ = front.addr; |
+ recv_buffers_.pop(); |
+ slot_available_callback_.Run(); |
+ |
+ return result; |
+ } |
+} |
+ |
+PP_NetAddress_Private UDPSocketFilter::RecvQueue::GetLastAddrPrivate() const { |
+ CHECK(private_api_); |
+ return last_recvfrom_addr_; |
+} |
+ |
+} // namespace proxy |
+} // namespace ppapi |