Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/mount_node_udp.h"
7 #ifdef PROVIDES_SOCKET_API
8 7
9 #include <errno.h> 8 #include <errno.h>
10 #include <string.h> 9 #include <string.h>
10 #include <sys/fcntl.h>
11
11 #include <algorithm> 12 #include <algorithm>
12 13
13 #include "nacl_io/mount.h" 14 #include "nacl_io/event_emitter_udp.h"
14 #include "nacl_io/mount_node_socket.h" 15 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/mount_node_udp.h" 16 #include "nacl_io/packet.h"
16 #include "nacl_io/pepper_interface.h" 17 #include "nacl_io/pepper_interface.h"
17 18
19 namespace {
20 const size_t kMaxPacketSize = 65536;
21 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
22 }
23
18 namespace nacl_io { 24 namespace nacl_io {
19 25
20 MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {} 26 class UDPWork : public MountStream::Work {
27 public:
28 explicit UDPWork(const ScopedEventEmitterUDP& emitter)
29 : MountStream::Work(emitter->stream()->mount_stream()),
30 emitter_(emitter),
31 packet_(NULL) {
32 }
33
34 ~UDPWork() {
35 delete packet_;
36 }
37
38 UDPSocketInterface* UDPInterface() {
39 return mount()->ppapi()->GetUDPSocketInterface();
40 }
41
42 protected:
43 ScopedEventEmitterUDP emitter_;
44 Packet* packet_;
45 };
21 46
22 47
23 UDPSocketInterface* MountNodeUDP::UDPSocket() { 48 class UDPSendWork : public UDPWork {
24 if (mount_->ppapi() == NULL) 49 public:
25 return NULL; 50 explicit UDPSendWork(const ScopedEventEmitterUDP& emitter)
51 : UDPWork(emitter) {}
26 52
27 return mount_->ppapi()->GetUDPSocketInterface(); 53 virtual bool Start(int32_t val) {
54 AUTO_LOCK(emitter_->GetLock());
55 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
56
57 // If the stream is valid, and we are not currently sending
58 if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
59 packet_ = emitter_->ReadTXPacket_Locked();
60 if (packet_) {
61 stream->SetStreamFlags(SSF_SENDING);
62 int err = UDPInterface()->SendTo(stream->socket_resource(),
63 packet_->buffer(),
64 packet_->len(),
65 packet_->addr(),
66 mount()->GetRunCompletion(this));
67 if (err == PP_OK_COMPLETIONPENDING)
68 return true;
69 stream->ClearStreamFlags(SSF_SENDING);
70 }
71 }
72 return false;
73 }
74
75 virtual void Run(int32_t val) {
76 AUTO_LOCK(emitter_->GetLock());
77 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
78
79 // If the stream is still there, see if we can queue more output
80 if (stream) {
81 stream->ClearStreamFlags(SSF_SENDING);
82 stream->QueueOutput();
83 }
84 }
85 };
86
87
88 class UDPRecvWork : public UDPWork {
89 public:
90 explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
91 : UDPWork(emitter) {
92 data_ = new char[kMaxPacketSize];
93 }
94
95 ~UDPRecvWork() {
96 delete[] data_;
97 }
98
99 virtual bool Start(int32_t val) {
100 AUTO_LOCK(emitter_->GetLock());
101 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
102
103 // If the stream is valid and we are not currently receiving
104 if (stream && ((stream->GetStreamFlags() & SSF_RECVING) == 0)) {
105 stream->SetStreamFlags(SSF_RECVING);
106 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
107 data_,
108 kMaxPacketSize,
109 &addr_,
110 mount()->GetRunCompletion(this));
111 if (err == PP_OK_COMPLETIONPENDING)
112 return true;
113 }
114 stream->ClearStreamFlags(SSF_RECVING);
binji 2013/09/19 00:48:55 move inside for failure case
noelallen1 2013/09/19 21:29:27 Done.
115 return false;
116 }
117
118 virtual void Run(int32_t val) {
119 AUTO_LOCK(emitter_->GetLock());
120 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
121
122 // If the stream is still there, see if we can queue more input
123 if (stream) {
124 if (val > 0) {
125 Packet* packet = new Packet(mount()->ppapi());
126 packet->Copy(data_, val, addr_);
127 emitter_->WriteRXPacket_Locked(packet);
128 }
129 stream->ClearStreamFlags(SSF_RECVING);
130 stream->QueueInput();
131 }
132 }
133
134 private:
135 char* data_;
136 PP_Resource addr_;
137 };
138
139
140 MountNodeUDP::MountNodeUDP(Mount* mount)
141 : MountNodeSocket(mount),
142 emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
143 emitter_->AttachStream(this);
144 }
145
146 void MountNodeUDP::Destroy() {
147 emitter_->DetachStream();
148 MountNodeSocket::Destroy();
149 }
150
151 EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
152 return emitter_.get();
28 } 153 }
29 154
30 Error MountNodeUDP::Init(int flags) { 155 Error MountNodeUDP::Init(int flags) {
31 if (UDPSocket() == NULL) 156 if (UDPInterface() == NULL)
32 return EACCES; 157 return EACCES;
33 158
34 socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance()); 159 socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 160 if (0 == socket_resource_)
36 return EACCES; 161 return EACCES;
37 162
38 return 0; 163 return 0;
39 } 164 }
40 165
166 void MountNodeUDP::QueueInput() {
167 UDPRecvWork* work = new UDPRecvWork(emitter_);
168 mount_stream()->EnqueueWork(work);
169 }
170
171 void MountNodeUDP::QueueOutput() {
172 UDPSendWork* work = new UDPSendWork(emitter_);
173 mount_stream()->EnqueueWork(work);
174 }
175
41 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { 176 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
42 if (0 == socket_resource_) 177 if (0 == socket_resource_)
43 return EBADF; 178 return EBADF;
44 179
45 /* Only bind once. */ 180 /* Only bind once. */
46 if (local_addr_ != 0) 181 if (local_addr_ != 0)
47 return EINVAL; 182 return EINVAL;
48 183
49 PP_Resource out_addr = SockAddrToResource(addr, len); 184 PP_Resource out_addr = SockAddrToResource(addr, len);
50 if (0 == out_addr) 185 if (0 == out_addr)
51 return EINVAL; 186 return EINVAL;
52 187
53 int err = UDPSocket()->Bind(socket_resource_, 188 int err = UDPInterface()->Bind(socket_resource_,
54 out_addr, 189 out_addr,
55 PP_BlockUntilComplete()); 190 PP_BlockUntilComplete());
191 QueueInput();
56 if (err != 0) { 192 if (err != 0) {
57 mount_->ppapi()->ReleaseResource(out_addr); 193 mount_->ppapi()->ReleaseResource(out_addr);
58 return PPErrorToErrno(err); 194 return PPErrorToErrno(err);
59 } 195 }
60 196
61 local_addr_ = out_addr; 197 local_addr_ = out_addr;
62 return 0; 198 return 0;
63 } 199 }
64 200
65 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) { 201 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) {
(...skipping 14 matching lines...) Expand all
80 } 216 }
81 217
82 Error MountNodeUDP::RecvFromHelper(void* buf, 218 Error MountNodeUDP::RecvFromHelper(void* buf,
83 size_t len, 219 size_t len,
84 int flags, 220 int flags,
85 PP_Resource* out_addr, 221 PP_Resource* out_addr,
86 int* out_len) { 222 int* out_len) {
87 if (0 == socket_resource_) 223 if (0 == socket_resource_)
88 return EBADF; 224 return EBADF;
89 225
90 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 226 int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
91 int err = UDPSocket()->RecvFrom(socket_resource_,
92 static_cast<char*>(buf),
93 capped_len,
94 out_addr,
95 PP_BlockUntilComplete());
96 if (err < 0)
97 return PPErrorToErrno(err);
98 227
99 *out_len = err; 228 EventListenerLock wait(GetEventEmitter());
229 Error err = wait.WaitOnEvent(POLLIN, ms);
230 if (err)
231 return err;
232
233 Packet* packet = emitter_->ReadRXPacket_Locked();
234
235 *out_len = 0;
236 *out_addr = 0;
237 if (packet) {
238 int capped_len =
239 static_cast<int32_t>(std::min<int>(len, packet->len()));
240 memcpy(buf, packet->buffer(), capped_len);
241
242 if (packet->addr() != 0) {
243 mount_->ppapi()->AddRefResource(packet->addr());
244 *out_addr = packet->addr();
245 }
246 *out_len = capped_len;
247
248 delete packet;
249 QueueInput();
250 }
100 return 0; 251 return 0;
101 } 252 }
102 253
254 Error MountNodeUDP::SendToHelper(const void* buf,
255 size_t len,
256 int flags,
257 PP_Resource addr,
258 int* out_len) {
259 *out_len = 0;
260 if (0 == socket_resource_)
261 return EBADF;
262
263 if (0 == addr)
264 return ENOTCONN;
265
266 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
267 int ms = (GetMode() & O_NONBLOCK) ? 0 : write_timeout_;
268
269 EventListenerLock wait(GetEventEmitter());
270 Error err = wait.WaitOnEvent(POLLOUT, ms);
271 if (err)
272 return err;
273
274 Packet* packet = new Packet(mount_->ppapi());
275 packet->Copy(buf, capped_len, addr);
276
277 emitter_->WriteTXPacket_Locked(packet);
278 QueueOutput();
279
280 *out_len = capped_len;
281 return 0;
282 }
283
284
103 Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) { 285 Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
104 while (1) { 286 while (1) {
105 int local_len = 0; 287 int local_len = 0;
106 PP_Resource addr = 0; 288 PP_Resource addr = 0;
107 289
108 int err = RecvFromHelper(buf, len, flags, &addr, &local_len); 290 int err = RecvFromHelper(buf, len, flags, &addr, &local_len);
109 if (err < 0) 291 if (err < 0)
110 return PPErrorToErrno(err); 292 return PPErrorToErrno(err);
111 293
112 /* If "connected" then only receive packets from the given remote. */ 294 // If "connected" then only receive packets from the given remote.
113 bool same = IsEquivalentAddress(addr, remote_addr_); 295 bool same = IsEquivalentAddress(addr, remote_addr_);
binji 2013/09/19 22:40:25 This behavior is gone now? Is it necessary?
noelallen1 2013/09/20 00:51:27 Not really needed for first pass. BUG=295177
296
297 // Release references from RecvFromHelper
114 mount_->ppapi()->ReleaseResource(addr); 298 mount_->ppapi()->ReleaseResource(addr);
115 299
116 if (remote_addr_ != 0 && same) 300 if (remote_addr_ != 0 && same)
117 continue; 301 continue;
118 302
119 *out_len = local_len; 303 *out_len = local_len;
120 return 0; 304 return 0;
121 } 305 }
122 } 306 }
123 307
124 Error MountNodeUDP::RecvFrom(void* buf, 308 Error MountNodeUDP::RecvFrom(void* buf,
125 size_t len, 309 size_t len,
126 int flags, 310 int flags,
127 struct sockaddr* src_addr, 311 struct sockaddr* src_addr,
128 socklen_t* addrlen, 312 socklen_t* addrlen,
129 int* out_len) { 313 int* out_len) {
130 PP_Resource addr = 0; 314 PP_Resource addr = 0;
131 int err = RecvFromHelper(buf, len, flags, &addr, out_len); 315 int err = RecvFromHelper(buf, len, flags, &addr, out_len);
132 if (err < 0) 316 if (err < 0)
133 return PPErrorToErrno(err); 317 return PPErrorToErrno(err);
134 318
135 if (src_addr) 319 if (src_addr)
136 *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr); 320 *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
137 321
138 mount_->ppapi()->ReleaseResource(addr); 322 mount_->ppapi()->ReleaseResource(addr);
139 return 0; 323 return 0;
140 } 324 }
141 325
142
143 Error MountNodeUDP::SendToHelper(const void* buf,
144 size_t len,
145 int flags,
146 PP_Resource addr,
147 int* out_len) {
148 if (0 == socket_resource_)
149 return EBADF;
150
151 if (0 == addr)
152 return ENOTCONN;
153
154 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
155 int err = UDPSocket()->SendTo(socket_resource_,
156 static_cast<const char*>(buf),
157 capped_len,
158 addr,
159 PP_BlockUntilComplete());
160 if (err < 0)
161 return PPErrorToErrno(err);
162
163 *out_len = err;
164 return 0;
165 }
166
167 Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) { 326 Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
168 return SendToHelper(buf, len, flags, remote_addr_, out_len); 327 return SendToHelper(buf, len, flags, remote_addr_, out_len);
169 } 328 }
170 329
171 Error MountNodeUDP::SendTo(const void* buf, 330 Error MountNodeUDP::SendTo(const void* buf,
172 size_t len, 331 size_t len,
173 int flags, 332 int flags,
174 const struct sockaddr* dest_addr, 333 const struct sockaddr* dest_addr,
175 socklen_t addrlen, 334 socklen_t addrlen,
176 int* out_len) { 335 int* out_len) {
177 PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen); 336 PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen);
178 if (0 == out_addr) 337 if (0 == out_addr)
179 return EINVAL; 338 return EINVAL;
180 339
181 Error err = SendToHelper(buf, len, flags, out_addr, out_len); 340 Error err = SendToHelper(buf, len, flags, out_addr, out_len);
182 mount_->ppapi()->ReleaseResource(out_addr); 341 mount_->ppapi()->ReleaseResource(out_addr);
183 return err; 342 return err;
184 } 343 }
185 344
186 } // namespace nacl_io 345 } // namespace nacl_io
187 346
188 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698