Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(185)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/mount_node_udp.h"
7 #ifdef PROVIDES_SOCKET_API
8 7
9 #include <errno.h> 8 #include <errno.h>
10 #include <string.h> 9 #include <string.h>
10
11 #include <algorithm> 11 #include <algorithm>
12 12
13 #include "nacl_io/mount.h" 13 #include "nacl_io/event_emitter_udp.h"
14 #include "nacl_io/mount_node_socket.h" 14 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/mount_node_udp.h" 15 #include "nacl_io/packet.h"
16 #include "nacl_io/pepper_interface.h" 16 #include "nacl_io/pepper_interface.h"
17 17
18 namespace {
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21 }
22
18 namespace nacl_io { 23 namespace nacl_io {
19 24
20 MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {} 25 class UDPWork : public MountStream::Work {
26 public:
27 explicit UDPWork(const ScopedEventEmitterUDP& emitter)
28 : MountStream::Work(emitter->stream()->mount_stream()),
29 emitter_(emitter),
30 packet_(NULL) {
31 }
32
33 ~UDPWork() {
34 delete packet_;
35 }
36
37 UDPSocketInterface* UDPInterface() {
38 return mount()->ppapi()->GetUDPSocketInterface();
39 }
40
41 protected:
42 ScopedEventEmitterUDP emitter_;
43 Packet* packet_;
44 };
21 45
22 46
23 UDPSocketInterface* MountNodeUDP::UDPSocket() { 47 class UDPSendWork : public UDPWork {
24 if (mount_->ppapi() == NULL) 48 public:
25 return NULL; 49 explicit UDPSendWork(const ScopedEventEmitterUDP& emitter)
50 : UDPWork(emitter) {}
26 51
27 return mount_->ppapi()->GetUDPSocketInterface(); 52 virtual bool Start(int32_t val) {
53 AUTO_LOCK(emitter_->GetLock());
54 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
55
56 // Does the stream exist, and can it send?
57 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
58 return false;
59
60 // If not currently sending...
61 if (!stream->TestStreamFlags(SSF_SENDING)) {
62 packet_ = emitter_->ReadTXPacket_Locked();
63 if (packet_) {
64 stream->SetStreamFlags(SSF_SENDING);
65 int err = UDPInterface()->SendTo(stream->socket_resource(),
66 packet_->buffer(),
67 packet_->len(),
68 packet_->addr(),
69 mount()->GetRunCompletion(this));
70 if (err == PP_OK_COMPLETIONPENDING)
71 return true;
72
73 // Anything else, we should assume the socket has gone bad.
74 stream->SetError_Locked(err);
75 }
76 }
77 return false;
78 }
79
80 virtual void Run(int32_t length_error) {
81 AUTO_LOCK(emitter_->GetLock());
82 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
83
84 // If the stream is still there...
85 if (stream) {
86 // And we did send, then Q more work.
87 if (length_error >= 0) {
88 stream->ClearStreamFlags(SSF_SENDING);
89 stream->QueueOutput();
90 } else {
91 // Otherwise this socket has gone bad.
92 stream->SetError_Locked(length_error);
93 }
94 }
95 }
96 };
97
98
99 class UDPRecvWork : public UDPWork {
100 public:
101 explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
102 : UDPWork(emitter) {
103 data_ = new char[kMaxPacketSize];
104 }
105
106 ~UDPRecvWork() {
107 delete[] data_;
108 }
109
110 virtual bool Start(int32_t val) {
111 AUTO_LOCK(emitter_->GetLock());
112 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
113
114 // Does the stream exist, and can it recv?
115 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
116 return false;
117
118 // If the stream is valid and we are not currently receiving
119 if (!stream->TestStreamFlags(SSF_RECVING)) {
120 stream->SetStreamFlags(SSF_RECVING);
121 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
122 data_,
123 kMaxPacketSize,
124 &addr_,
125 mount()->GetRunCompletion(this));
126 if (err == PP_OK_COMPLETIONPENDING)
127 return true;
128
129 stream->SetError_Locked(err);
130 }
131 return false;
132 }
133
134 virtual void Run(int32_t length_error) {
135 AUTO_LOCK(emitter_->GetLock());
136 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
137
138 // If the stream is still there, see if we can queue more input
139 if (stream) {
140 if (length_error > 0) {
141 Packet* packet = new Packet(mount()->ppapi());
142 packet->Copy(data_, length_error, addr_);
143 emitter_->WriteRXPacket_Locked(packet);
144 stream->ClearStreamFlags(SSF_RECVING);
145 stream->QueueInput();
146 } else {
147 stream->SetError_Locked(length_error);
148 }
149 }
150 }
151
152 private:
153 char* data_;
154 PP_Resource addr_;
155 };
156
157
158 MountNodeUDP::MountNodeUDP(Mount* mount)
159 : MountNodeSocket(mount),
160 emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
161 emitter_->AttachStream(this);
162 }
163
164 void MountNodeUDP::Destroy() {
165 emitter_->DetachStream();
166 MountNodeSocket::Destroy();
167 }
168
169 EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
170 return emitter_.get();
28 } 171 }
29 172
30 Error MountNodeUDP::Init(int flags) { 173 Error MountNodeUDP::Init(int flags) {
31 if (UDPSocket() == NULL) 174 if (UDPInterface() == NULL)
32 return EACCES; 175 return EACCES;
33 176
34 socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance()); 177 socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 178 if (0 == socket_resource_)
36 return EACCES; 179 return EACCES;
37 180
38 return 0; 181 return 0;
39 } 182 }
40 183
184 void MountNodeUDP::QueueInput() {
185 UDPRecvWork* work = new UDPRecvWork(emitter_);
186 mount_stream()->EnqueueWork(work);
187 }
188
189 void MountNodeUDP::QueueOutput() {
190 UDPSendWork* work = new UDPSendWork(emitter_);
191 mount_stream()->EnqueueWork(work);
192 }
193
41 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { 194 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
42 if (0 == socket_resource_) 195 if (0 == socket_resource_)
43 return EBADF; 196 return EBADF;
44 197
45 /* Only bind once. */ 198 /* Only bind once. */
46 if (local_addr_ != 0) 199 if (local_addr_ != 0)
47 return EINVAL; 200 return EINVAL;
48 201
49 PP_Resource out_addr = SockAddrToResource(addr, len); 202 PP_Resource out_addr = SockAddrToResource(addr, len);
50 if (0 == out_addr) 203 if (0 == out_addr)
51 return EINVAL; 204 return EINVAL;
52 205
53 int err = UDPSocket()->Bind(socket_resource_, 206 int err = UDPInterface()->Bind(socket_resource_,
54 out_addr, 207 out_addr,
55 PP_BlockUntilComplete()); 208 PP_BlockUntilComplete());
56 if (err != 0) { 209 if (err != 0) {
57 mount_->ppapi()->ReleaseResource(out_addr); 210 mount_->ppapi()->ReleaseResource(out_addr);
58 return PPErrorToErrno(err); 211 return PPErrorToErrno(err);
59 } 212 }
60 213
214 // Now that we are bound, we can start sending and receiving.
215 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
216 QueueInput();
217
61 local_addr_ = out_addr; 218 local_addr_ = out_addr;
62 return 0; 219 return 0;
63 } 220 }
64 221
65 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) { 222 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) {
66 if (0 == socket_resource_) 223 if (0 == socket_resource_)
67 return EBADF; 224 return EBADF;
68 225
69 /* Connect for UDP is the default dest, it's legal to change it. */ 226 /* Connect for UDP is the default dest, it's legal to change it. */
70 if (remote_addr_ != 0) { 227 if (remote_addr_ != 0) {
71 mount_->ppapi()->ReleaseResource(remote_addr_); 228 mount_->ppapi()->ReleaseResource(remote_addr_);
72 remote_addr_ = 0; 229 remote_addr_ = 0;
73 } 230 }
74 231
75 remote_addr_ = SockAddrToResource(addr, len); 232 remote_addr_ = SockAddrToResource(addr, len);
76 if (0 == remote_addr_) 233 if (0 == remote_addr_)
77 return EINVAL; 234 return EINVAL;
78 235
79 return 0; 236 return 0;
80 } 237 }
81 238
82 Error MountNodeUDP::RecvFromHelper(void* buf, 239 Error MountNodeUDP::Recv_Locked(void* buf,
83 size_t len, 240 size_t len,
84 int flags, 241 PP_Resource* out_addr,
85 PP_Resource* out_addr, 242 int* out_len) {
86 int* out_len) { 243 Packet* packet = emitter_->ReadRXPacket_Locked();
87 if (0 == socket_resource_) 244 *out_len = 0;
88 return EBADF; 245 *out_addr = 0;
89 246
90 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 247 if (packet) {
91 int err = UDPSocket()->RecvFrom(socket_resource_, 248 int capped_len =
92 static_cast<char*>(buf), 249 static_cast<int32_t>(std::min<int>(len, packet->len()));
93 capped_len, 250 memcpy(buf, packet->buffer(), capped_len);
94 out_addr,
95 PP_BlockUntilComplete());
96 if (err < 0)
97 return PPErrorToErrno(err);
98 251
99 *out_len = err; 252 if (packet->addr() != 0) {
100 return 0; 253 mount_->ppapi()->AddRefResource(packet->addr());
254 *out_addr = packet->addr();
255 }
256
257 *out_len = capped_len;
258 delete packet;
259 return 0;
260 }
261
262 // Should never happen, Recv_Locked should not be called
263 // unless already in a POLLIN state.
264 return EBADF;
101 } 265 }
102 266
103 Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) { 267 Error MountNodeUDP::Send_Locked(const void* buf,
104 while (1) { 268 size_t len,
105 int local_len = 0; 269 PP_Resource addr,
106 PP_Resource addr = 0; 270 int* out_len) {
271 *out_len = 0;
272 int capped_len =
273 static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
274 Packet* packet = new Packet(mount_->ppapi());
275 packet->Copy(buf, capped_len, addr);
107 276
108 int err = RecvFromHelper(buf, len, flags, &addr, &local_len); 277 emitter_->WriteTXPacket_Locked(packet);
109 if (err < 0) 278 *out_len = capped_len;
110 return PPErrorToErrno(err);
111
112 /* If "connected" then only receive packets from the given remote. */
113 bool same = IsEquivalentAddress(addr, remote_addr_);
114 mount_->ppapi()->ReleaseResource(addr);
115
116 if (remote_addr_ != 0 && same)
117 continue;
118
119 *out_len = local_len;
120 return 0;
121 }
122 }
123
124 Error MountNodeUDP::RecvFrom(void* buf,
125 size_t len,
126 int flags,
127 struct sockaddr* src_addr,
128 socklen_t* addrlen,
129 int* out_len) {
130 PP_Resource addr = 0;
131 int err = RecvFromHelper(buf, len, flags, &addr, out_len);
132 if (err < 0)
133 return PPErrorToErrno(err);
134
135 if (src_addr)
136 *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
137
138 mount_->ppapi()->ReleaseResource(addr);
139 return 0; 279 return 0;
140 } 280 }
141 281
142
143 Error MountNodeUDP::SendToHelper(const void* buf,
144 size_t len,
145 int flags,
146 PP_Resource addr,
147 int* out_len) {
148 if (0 == socket_resource_)
149 return EBADF;
150
151 if (0 == addr)
152 return ENOTCONN;
153
154 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
155 int err = UDPSocket()->SendTo(socket_resource_,
156 static_cast<const char*>(buf),
157 capped_len,
158 addr,
159 PP_BlockUntilComplete());
160 if (err < 0)
161 return PPErrorToErrno(err);
162
163 *out_len = err;
164 return 0;
165 }
166
167 Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
168 return SendToHelper(buf, len, flags, remote_addr_, out_len);
169 }
170
171 Error MountNodeUDP::SendTo(const void* buf,
172 size_t len,
173 int flags,
174 const struct sockaddr* dest_addr,
175 socklen_t addrlen,
176 int* out_len) {
177 PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen);
178 if (0 == out_addr)
179 return EINVAL;
180
181 Error err = SendToHelper(buf, len, flags, out_addr, out_len);
182 mount_->ppapi()->ReleaseResource(out_addr);
183 return err;
184 }
185
186 } // namespace nacl_io 282 } // namespace nacl_io
187 283
188 #endif // PROVIDES_SOCKET_API
OLDNEW
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/mount_node_udp.h ('k') | native_client_sdk/src/libraries/nacl_io/mount_socket.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698