Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_udp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added Fifo Tests Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/mount_node_udp.h"
7 #ifdef PROVIDES_SOCKET_API
8 7
9 #include <errno.h> 8 #include <errno.h>
10 #include <string.h> 9 #include <string.h>
10 #include <sys/fcntl.h>
11
11 #include <algorithm> 12 #include <algorithm>
12 13
13 #include "nacl_io/mount.h" 14 #include "nacl_io/event_emitter_udp.h"
14 #include "nacl_io/mount_node_socket.h" 15 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/mount_node_udp.h"
16 #include "nacl_io/pepper_interface.h" 16 #include "nacl_io/pepper_interface.h"
17 17
18 namespace {
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21 }
22
18 namespace nacl_io { 23 namespace nacl_io {
19 24
20 MountNodeUDP::MountNodeUDP(Mount* mount) : MountNodeSocket(mount) {} 25 class UDPWork : public MountStreamWork {
26 public:
27 UDPWork(EventEmitterUDP* emitter)
binji 2013/09/15 22:18:58 many of the comments on TCP apply here too (explic
binji 2013/09/15 22:18:58 UDPWork(const ScopedEventEmitterUDP& emitter)
noelallen1 2013/09/17 21:21:54 Done.
28 : MountStreamWork(emitter->stream()->mount_stream()),
29 emitter_(emitter) {
30 }
31
32 UDPSocketInterface* UDPInterface() {
33 return mount()->ppapi()->GetUDPSocketInterface();
34 }
35
36 protected:
37 ScopedEventEmitterUDP emitter_;
38 };
21 39
22 40
23 UDPSocketInterface* MountNodeUDP::UDPSocket() { 41 class UDPSendWork : public UDPWork {
24 if (mount_->ppapi() == NULL) 42 public:
25 return NULL; 43 UDPSendWork(EventEmitterUDP* emitter) : UDPWork(emitter) {}
26 44
27 return mount_->ppapi()->GetUDPSocketInterface(); 45 virtual bool Start(int32_t val) {
46 AUTO_LOCK(emitter_->GetLock());
47 MountNodeUDP* stream =
48 reinterpret_cast<MountNodeUDP*>(emitter_->stream());
49
50 // If the stream is valid, and we are not currently sending
51 if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
52 Packet* packet = emitter_->ReadTXPacket_Locked();
53 if (packet) {
54 stream->SetStreamFlags(SSF_SENDING);
55 int err = UDPInterface()->SendTo(stream->socket_resource(),
56 packet->buffer_.data(),
57 packet->buffer_.size(),
58 packet->addr_,
59 mount()->GetRunCompletion(this));
60
61 mount()->ppapi()->ReleaseResource(packet->addr_);
binji 2013/09/15 22:18:58 postpone cleanup until after SendTo completes?
noelallen1 2013/09/17 21:21:54 Moved referencing to Packet class.
62 delete packet;
63 if (err == PP_OK_COMPLETIONPENDING)
64 return true;
65 stream->ClearStreamFlags(SSF_SENDING);
66 }
67 }
68 return false;
69 }
70
71 virtual void Run(int32_t val) {
72 AUTO_LOCK(emitter_->GetLock());
73 MountNodeUDP* stream =
74 reinterpret_cast<MountNodeUDP*>(emitter_->stream());
75
76 // If the stream is still there, see if we can queue more output
77 if (stream) {
78 stream->ClearStreamFlags(SSF_SENDING);
79 stream->QueueOutput();
80 }
81 }
82 };
83
84
85 class UDPRecvWork : public UDPWork {
86 public:
87 UDPRecvWork(EventEmitterUDP* emitter) : UDPWork(emitter) {
88 data_ = new char[kMaxPacketSize];
89 }
90
91 ~UDPRecvWork() {
92 delete[] data_;
93 }
94
95 virtual bool Start(int32_t val) {
96 AUTO_LOCK(emitter_->GetLock());
97 MountNodeUDP* stream =
98 reinterpret_cast<MountNodeUDP*>(emitter_->stream());
99
100 // If the stream is valid and we are not currently receiving
101 if (stream && ((stream->GetStreamFlags() & SSF_RECVING) == 0)) {
102 stream->SetStreamFlags(SSF_RECVING);
103 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
104 data_,
105 kMaxPacketSize,
106 &addr_,
107 mount()->GetRunCompletion(this));
108 if (err == PP_OK_COMPLETIONPENDING)
109 return true;
110 }
111 stream->ClearStreamFlags(SSF_RECVING);
112 return false;
113 }
114
115 virtual void Run(int32_t val) {
116 AUTO_LOCK(emitter_->GetLock());
117 MountNodeUDP* stream =
118 reinterpret_cast<MountNodeUDP*>(emitter_->stream());
119
120 // If the stream is still there, see if we can queue more input
121 if (stream) {
122 if (val > 0) {
123 mount()->ppapi()->AddRefResource(addr_);
124 Packet* packet = new Packet(data_, val, addr_);
125 emitter_->WriteRXPacket_Locked(packet);
126 }
127 stream->ClearStreamFlags(SSF_RECVING);
128 stream->QueueInput();
129 }
130 }
131
132 private:
133 char* data_;
binji 2013/09/15 22:18:58 std::vector
noelallen1 2013/09/17 21:21:54 Using char* allows me to "take" the buffer. Other
134 PP_Resource addr_;
135 };
136
137
138 MountNodeUDP::MountNodeUDP(Mount* mount)
139 : MountNodeSocket(mount),
140 emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
141 emitter_->AttachStream(this);
142 }
143
144 void MountNodeUDP::Destroy() {
145 emitter_->DetachStream();
146 MountNodeSocket::Destroy();
147 }
148
149 EventEmitter* MountNodeUDP::GetEventEmitter() {
150 return emitter_.get();
28 } 151 }
29 152
30 Error MountNodeUDP::Init(int flags) { 153 Error MountNodeUDP::Init(int flags) {
31 if (UDPSocket() == NULL) 154 if (UDPInterface() == NULL)
32 return EACCES; 155 return EACCES;
33 156
34 socket_resource_ = UDPSocket()->Create(mount_->ppapi()->GetInstance()); 157 socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 158 if (0 == socket_resource_)
36 return EACCES; 159 return EACCES;
37 160
38 return 0; 161 return 0;
39 } 162 }
40 163
164 void MountNodeUDP::QueueInput() {
165 UDPRecvWork* work = new UDPRecvWork(emitter_.get());
binji 2013/09/15 22:18:58 pass scoped emitter directly instead of raw pointe
noelallen1 2013/09/17 21:21:54 Done.
166 mount_stream()->EnqueueWork(work);
167 }
168
169 void MountNodeUDP::QueueOutput() {
170 UDPSendWork* work = new UDPSendWork(emitter_.get());
171 mount_stream()->EnqueueWork(work);
172 }
173
41 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) { 174 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
42 if (0 == socket_resource_) 175 if (0 == socket_resource_)
43 return EBADF; 176 return EBADF;
44 177
45 /* Only bind once. */ 178 /* Only bind once. */
46 if (local_addr_ != 0) 179 if (local_addr_ != 0)
47 return EINVAL; 180 return EINVAL;
48 181
49 PP_Resource out_addr = SockAddrToResource(addr, len); 182 PP_Resource out_addr = SockAddrToResource(addr, len);
50 if (0 == out_addr) 183 if (0 == out_addr)
51 return EINVAL; 184 return EINVAL;
52 185
53 int err = UDPSocket()->Bind(socket_resource_, 186 int err = UDPInterface()->Bind(socket_resource_,
54 out_addr, 187 out_addr,
55 PP_BlockUntilComplete()); 188 PP_BlockUntilComplete());
189 QueueInput();
binji 2013/09/15 22:18:58 why not queue input and output here?
noelallen1 2013/09/17 21:21:54 Output doesn't make sense. There's no data yet.
56 if (err != 0) { 190 if (err != 0) {
57 mount_->ppapi()->ReleaseResource(out_addr); 191 mount_->ppapi()->ReleaseResource(out_addr);
58 return PPErrorToErrno(err); 192 return PPErrorToErrno(err);
59 } 193 }
60 194
61 local_addr_ = out_addr; 195 local_addr_ = out_addr;
62 return 0; 196 return 0;
63 } 197 }
64 198
65 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) { 199 Error MountNodeUDP::Connect(const struct sockaddr* addr, socklen_t len) {
(...skipping 14 matching lines...) Expand all
80 } 214 }
81 215
82 Error MountNodeUDP::RecvFromHelper(void* buf, 216 Error MountNodeUDP::RecvFromHelper(void* buf,
83 size_t len, 217 size_t len,
84 int flags, 218 int flags,
85 PP_Resource* out_addr, 219 PP_Resource* out_addr,
86 int* out_len) { 220 int* out_len) {
87 if (0 == socket_resource_) 221 if (0 == socket_resource_)
88 return EBADF; 222 return EBADF;
89 223
90 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 224 int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
91 int err = UDPSocket()->RecvFrom(socket_resource_,
92 static_cast<char*>(buf),
93 capped_len,
94 out_addr,
95 PP_BlockUntilComplete());
96 if (err < 0)
97 return PPErrorToErrno(err);
98 225
99 *out_len = err; 226 EventListenerLock wait(GetEventEmitter());
227 Error err = wait.WaitOnEvent(POLLIN, ms);
228 if (err)
229 return err;
230
231 Packet* packet = emitter_->ReadRXPacket_Locked();
232
233 *out_len = 0;
234 if (packet) {
235 int capped_len =
236 static_cast<int32_t>(std::min<int>(len, packet->buffer_.size()));
237 memcpy(buf, packet->buffer_.data(), capped_len);
238 *out_addr = packet->addr_;
239 *out_len = capped_len;
240 delete packet;
241
242 QueueInput();
binji 2013/09/15 22:18:58 The worker is already queuing up more work. Why he
noelallen1 2013/09/17 21:21:54 This is safer. Instead of making the requesting l
243 }
100 return 0; 244 return 0;
101 } 245 }
102 246
247 Error MountNodeUDP::SendToHelper(const void* buf,
248 size_t len,
249 int flags,
250 PP_Resource addr,
251 int* out_len) {
252 *out_len = 0;
253 if (0 == socket_resource_)
254 return EBADF;
255
256 if (0 == addr)
257 return ENOTCONN;
258
259 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
260 int ms = (GetMode() & O_NONBLOCK) ? 0 : read_timeout_;
binji 2013/09/15 22:18:58 write_timeout_?
noelallen1 2013/09/17 21:21:54 Done.
261
262 EventListenerLock wait(GetEventEmitter());
263 Error err = wait.WaitOnEvent(POLLOUT, ms);
264 if (err)
265 return err;
266
267 mount_->ppapi()->AddRefResource(addr);
268 Packet* packet = new Packet(static_cast<const char *>(buf),
269 capped_len,
270 addr);
271
272 emitter_->WriteTXPacket_Locked(packet);
273 QueueOutput();
274
275 *out_len = capped_len;
276 return 0;
277 }
278
279
103 Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) { 280 Error MountNodeUDP::Recv(void* buf, size_t len, int flags, int* out_len) {
104 while (1) { 281 while (1) {
105 int local_len = 0; 282 int local_len = 0;
106 PP_Resource addr = 0; 283 PP_Resource addr = 0;
107 284
108 int err = RecvFromHelper(buf, len, flags, &addr, &local_len); 285 int err = RecvFromHelper(buf, len, flags, &addr, &local_len);
109 if (err < 0) 286 if (err < 0)
110 return PPErrorToErrno(err); 287 return PPErrorToErrno(err);
111 288
112 /* If "connected" then only receive packets from the given remote. */ 289 /* If "connected" then only receive packets from the given remote. */
(...skipping 19 matching lines...) Expand all
132 if (err < 0) 309 if (err < 0)
133 return PPErrorToErrno(err); 310 return PPErrorToErrno(err);
134 311
135 if (src_addr) 312 if (src_addr)
136 *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr); 313 *addrlen = ResourceToSockAddr(addr, *addrlen, src_addr);
137 314
138 mount_->ppapi()->ReleaseResource(addr); 315 mount_->ppapi()->ReleaseResource(addr);
139 return 0; 316 return 0;
140 } 317 }
141 318
142
143 Error MountNodeUDP::SendToHelper(const void* buf,
144 size_t len,
145 int flags,
146 PP_Resource addr,
147 int* out_len) {
148 if (0 == socket_resource_)
149 return EBADF;
150
151 if (0 == addr)
152 return ENOTCONN;
153
154 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
155 int err = UDPSocket()->SendTo(socket_resource_,
156 static_cast<const char*>(buf),
157 capped_len,
158 addr,
159 PP_BlockUntilComplete());
160 if (err < 0)
161 return PPErrorToErrno(err);
162
163 *out_len = err;
164 return 0;
165 }
166
167 Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) { 319 Error MountNodeUDP::Send(const void* buf, size_t len, int flags, int* out_len) {
168 return SendToHelper(buf, len, flags, remote_addr_, out_len); 320 return SendToHelper(buf, len, flags, remote_addr_, out_len);
169 } 321 }
170 322
171 Error MountNodeUDP::SendTo(const void* buf, 323 Error MountNodeUDP::SendTo(const void* buf,
172 size_t len, 324 size_t len,
173 int flags, 325 int flags,
174 const struct sockaddr* dest_addr, 326 const struct sockaddr* dest_addr,
175 socklen_t addrlen, 327 socklen_t addrlen,
176 int* out_len) { 328 int* out_len) {
177 PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen); 329 PP_Resource out_addr = SockAddrToResource(dest_addr, addrlen);
178 if (0 == out_addr) 330 if (0 == out_addr)
179 return EINVAL; 331 return EINVAL;
180 332
181 Error err = SendToHelper(buf, len, flags, out_addr, out_len); 333 Error err = SendToHelper(buf, len, flags, out_addr, out_len);
182 mount_->ppapi()->ReleaseResource(out_addr); 334 mount_->ppapi()->ReleaseResource(out_addr);
183 return err; 335 return err;
184 } 336 }
185 337
186 } // namespace nacl_io 338 } // namespace nacl_io
187 339
188 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698