Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 26703008: [NaCl SDK] nacl_io: Add support for non-blocking connect/accept (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/ossocket.h"
7 #ifdef PROVIDES_SOCKET_API 7 #ifdef PROVIDES_SOCKET_API
8 8
9 #include <assert.h>
9 #include <errno.h> 10 #include <errno.h>
10 #include <string.h> 11 #include <string.h>
11 #include <algorithm> 12 #include <algorithm>
12 13
14 #include "nacl_io/kernel_handle.h"
13 #include "nacl_io/mount_node_tcp.h" 15 #include "nacl_io/mount_node_tcp.h"
14 #include "nacl_io/mount_stream.h" 16 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/pepper_interface.h" 17 #include "nacl_io/pepper_interface.h"
16 18
17 namespace { 19 namespace {
18 const size_t kMaxPacketSize = 65536; 20 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8; 21 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 } 22 }
21 23
22 namespace nacl_io { 24 namespace nacl_io {
(...skipping 27 matching lines...) Expand all
50 virtual bool Start(int32_t val) { 52 virtual bool Start(int32_t val) {
51 AUTO_LOCK(emitter_->GetLock()); 53 AUTO_LOCK(emitter_->GetLock());
52 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 54 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
53 55
54 // Does the stream exist, and can it send? 56 // Does the stream exist, and can it send?
55 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND)) 57 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
56 return false; 58 return false;
57 59
58 // If not currently sending... 60 // If not currently sending...
59 if (!stream->TestStreamFlags(SSF_SENDING)) { 61 if (!stream->TestStreamFlags(SSF_SENDING)) {
60 size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable(); 62 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61 int capped_len = std::min(tx_data_avail, kMaxPacketSize); 63 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
62 64
63 if (capped_len == 0) 65 if (capped_len == 0)
64 return false; 66 return false;
65 67
66 data_ = new char[capped_len]; 68 data_ = new char[capped_len];
67 emitter_->ReadOut_Locked(data_, capped_len); 69 emitter_->ReadOut_Locked(data_, capped_len);
68 70
69 stream->SetStreamFlags(SSF_SENDING); 71 stream->SetStreamFlags(SSF_SENDING);
70 int err = TCPInterface()->Write(stream->socket_resource(), 72 int err = TCPInterface()->Write(stream->socket_resource(),
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 virtual bool Start(int32_t val) { 108 virtual bool Start(int32_t val) {
107 AUTO_LOCK(emitter_->GetLock()); 109 AUTO_LOCK(emitter_->GetLock());
108 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 110 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
109 111
110 // Does the stream exist, and can it recv? 112 // Does the stream exist, and can it recv?
111 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) 113 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
112 return false; 114 return false;
113 115
114 // If we are not currently receiving 116 // If we are not currently receiving
115 if (!stream->TestStreamFlags(SSF_RECVING)) { 117 if (!stream->TestStreamFlags(SSF_RECVING)) {
116 size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable(); 118 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
117 int capped_len = 119 int capped_len =
118 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); 120 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
119 121
120 if (capped_len == 0) 122 if (capped_len == 0)
121 return false; 123 return false;
122 124
123 stream->SetStreamFlags(SSF_RECVING); 125 stream->SetStreamFlags(SSF_RECVING);
124 data_ = new char[capped_len]; 126 data_ = new char[capped_len];
125 int err = TCPInterface()->Read(stream->socket_resource(), 127 int err = TCPInterface()->Read(stream->socket_resource(),
126 data_, 128 data_,
(...skipping 18 matching lines...) Expand all
145 emitter_->WriteIn_Locked(data_, length_error); 147 emitter_->WriteIn_Locked(data_, length_error);
146 stream->ClearStreamFlags(SSF_RECVING); 148 stream->ClearStreamFlags(SSF_RECVING);
147 stream->QueueInput(); 149 stream->QueueInput();
148 } else { 150 } else {
149 stream->SetError_Locked(length_error); 151 stream->SetError_Locked(length_error);
150 } 152 }
151 } 153 }
152 } 154 }
153 }; 155 };
154 156
157 class TCPAcceptWork : public MountStream::Work {
158 public:
159 explicit TCPAcceptWork(MountStream* stream,
160 const ScopedEventEmitterTCP& emitter)
161 : MountStream::Work(stream),
162 emitter_(emitter) {}
163
164 TCPSocketInterface* TCPInterface() {
165 return mount()->ppapi()->GetTCPSocketInterface();
166 }
167
168 virtual bool Start(int32_t val) {
169 AUTO_LOCK(emitter_->GetLock());
170 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
171
172 // Does the stream exist, and can it accept?
173 if (NULL == node)
174 return false;
175
176 // If we are not currently accepting
177 if (!node->TestStreamFlags(SSF_LISTENING))
178 return false;
179
180 int err = TCPInterface()->Accept(node->socket_resource(),
181 &new_socket_,
182 mount()->GetRunCompletion(this));
183
184 if (err != PP_OK_COMPLETIONPENDING)
185 // Anything else, we should assume the socket has gone bad.
186 node->SetError_Locked(err);
187
188 return true;
189 }
190
191 virtual void Run(int32_t error) {
192 AUTO_LOCK(emitter_->GetLock());
193 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
194
195 if (node == NULL)
196 return;
197
198 if (error != PP_OK) {
199 node->SetError_Locked(error);
200 return;
201 }
202
203 emitter_->SetAcceptedSocket_Locked(new_socket_);
204 }
205
206 protected:
207 PP_Resource new_socket_;
208 ScopedEventEmitterTCP emitter_;
209 };
210
211 class TCPConnectWork : public MountStream::Work {
212 public:
213 explicit TCPConnectWork(MountStream* stream,
214 const ScopedEventEmitterTCP& emitter)
215 : MountStream::Work(stream),
216 emitter_(emitter) {}
217
218 TCPSocketInterface* TCPInterface() {
219 return mount()->ppapi()->GetTCPSocketInterface();
220 }
221
222 virtual bool Start(int32_t val) {
223 AUTO_LOCK(emitter_->GetLock());
224 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
225
226 // Does the stream exist, and can it connect?
227 if (NULL == node)
228 return false;
229
230 int err = TCPInterface()->Connect(node->socket_resource(),
231 node->remote_addr(),
232 mount()->GetRunCompletion(this));
233
234 if (err != PP_OK_COMPLETIONPENDING)
235 // Anything else, we should assume the socket has gone bad.
236 node->SetError_Locked(err);
237
238 return true;
239 }
240
241 virtual void Run(int32_t error) {
242 AUTO_LOCK(emitter_->GetLock());
243 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
244
245 if (node == NULL)
246 return;
247
248 if (error != PP_OK) {
249 node->ConnectFailed_Locked();
250 node->SetError_Locked(error);
251 return;
252 }
253
254 node->ConnectDone_Locked();
255 }
256
257 protected:
258 ScopedEventEmitterTCP emitter_;
259 };
260
155 MountNodeTCP::MountNodeTCP(Mount* mount) 261 MountNodeTCP::MountNodeTCP(Mount* mount)
156 : MountNodeSocket(mount), 262 : MountNodeSocket(mount),
157 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { 263 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
158 emitter_->AttachStream(this); 264 emitter_->AttachStream(this);
159 } 265 }
160 266
161 MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket) 267 MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket)
162 : MountNodeSocket(mount, socket), 268 : MountNodeSocket(mount, socket),
163 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { 269 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
164 emitter_->AttachStream(this); 270 emitter_->AttachStream(this);
165 } 271 }
166 272
167 void MountNodeTCP::Destroy() { 273 void MountNodeTCP::Destroy() {
168 emitter_->DetachStream(); 274 emitter_->DetachStream();
169 MountNodeSocket::Destroy(); 275 MountNodeSocket::Destroy();
170 } 276 }
171 277
172 Error MountNodeTCP::Init(int open_flags) { 278 Error MountNodeTCP::Init(int open_flags) {
173 Error err = MountNodeSocket::Init(open_flags); 279 Error err = MountNodeSocket::Init(open_flags);
174 if (err != 0) 280 if (err != 0)
175 return err; 281 return err;
176 282
177 if (TCPInterface() == NULL) 283 if (TCPInterface() == NULL)
178 return EACCES; 284 return EACCES;
179 285
286 SetStreamFlags(SSF_CAN_CONNECT);
287
180 if (socket_resource_ != 0) { 288 if (socket_resource_ != 0) {
181 // TCP sockets that are contructed with an existing socket_resource_ 289 // TCP sockets that are contructed with an existing socket_resource_
182 // are those that generated from calls to Accept() and therefore are 290 // are those that generated from calls to Accept() and therefore are
183 // already connected. 291 // already connected.
184 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_); 292 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
185 ConnectDone(); 293 ConnectDone_Locked();
186 } else { 294 } else {
187 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); 295 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
188 if (0 == socket_resource_) 296 if (0 == socket_resource_)
189 return EACCES; 297 return EACCES;
190 } 298 }
191 299
192 return 0; 300 return 0;
193 } 301 }
194 302
195 EventEmitterTCP* MountNodeTCP::GetEventEmitter() { 303 EventEmitter* MountNodeTCP::GetEventEmitter() {
196 return emitter_.get(); 304 return emitter_.get();
197 } 305 }
198 306
307 void MountNodeTCP::QueueAccept() {
308 MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_);
309 mount_stream()->EnqueueWork(work);
310 }
311
312 void MountNodeTCP::QueueConnect() {
313 MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_);
314 mount_stream()->EnqueueWork(work);
315 }
316
199 void MountNodeTCP::QueueInput() { 317 void MountNodeTCP::QueueInput() {
200 TCPRecvWork* work = new TCPRecvWork(emitter_); 318 MountStream::Work* work = new TCPRecvWork(emitter_);
201 mount_stream()->EnqueueWork(work); 319 mount_stream()->EnqueueWork(work);
202 } 320 }
203 321
204 void MountNodeTCP::QueueOutput() { 322 void MountNodeTCP::QueueOutput() {
205 TCPSendWork* work = new TCPSendWork(emitter_); 323 MountStream::Work* work = new TCPSendWork(emitter_);
206 mount_stream()->EnqueueWork(work); 324 mount_stream()->EnqueueWork(work);
207 } 325 }
208 326
209 Error MountNodeTCP::Accept(PP_Resource* out_sock, 327 Error MountNodeTCP::Accept(const HandleAttr& attr,
328 PP_Resource* out_sock,
210 struct sockaddr* addr, 329 struct sockaddr* addr,
211 socklen_t* len) { 330 socklen_t* len) {
212 AUTO_LOCK(node_lock_); 331 EventListenerLock wait(GetEventEmitter());
213 int err = TCPInterface()->Accept(socket_resource_,
214 out_sock,
215 PP_BlockUntilComplete());
216 332
217 if (err != PP_OK) 333 if (!TestStreamFlags(SSF_LISTENING))
218 return PPErrorToErrno(err); 334 return EINVAL;
219 335
336 // Either block forever or not at all
337 int ms = attr.IsBlocking() ? -1 : 0;
338
339 Error err = wait.WaitOnEvent(POLLIN, ms);
340
341 if (ETIMEDOUT == err)
342 return EWOULDBLOCK;
343
344 int s = emitter_->GetAcceptedSocket_Locked();
345 // Non-blocking case.
346 if (s == 0)
347 return EAGAIN;
348
349 // Consume the new socket and start listening for the next one
350 *out_sock = s;
351 emitter_->ClearEvents_Locked(POLLIN);
352
353 // Set the out paramaters
220 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock); 354 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
221 *len = ResourceToSockAddr(remote_addr, *len, addr); 355 *len = ResourceToSockAddr(remote_addr, *len, addr);
222 mount_->ppapi()->ReleaseResource(remote_addr); 356 mount_->ppapi()->ReleaseResource(remote_addr);
357
358 QueueAccept();
223 return 0; 359 return 0;
224 } 360 }
225 361
226 // We can not bind a client socket with PPAPI. For now we ignore the 362 // We can not bind a client socket with PPAPI. For now we ignore the
227 // bind but report the correct address later, just in case someone is 363 // bind but report the correct address later, just in case someone is
228 // binding without really caring what the address is (for example to 364 // binding without really caring what the address is (for example to
229 // select a more optimized interface/route.) 365 // select a more optimized interface/route.)
230 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 366 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
231 AUTO_LOCK(node_lock_); 367 AUTO_LOCK(node_lock_);
232 368
233 /* Only bind once. */ 369 /* Only bind once. */
234 if (local_addr_ != 0) 370 if (local_addr_ != 0)
235 return EINVAL; 371 return EINVAL;
236 372
237 local_addr_ = SockAddrToResource(addr, len); 373 local_addr_ = SockAddrToResource(addr, len);
238 int err = TCPInterface()->Bind(socket_resource_, 374 int err = TCPInterface()->Bind(socket_resource_,
239 local_addr_, 375 local_addr_,
240 PP_BlockUntilComplete()); 376 PP_BlockUntilComplete());
241 377
242 // If we fail, release the local addr resource 378 // If we fail, release the local addr resource
243 if (err != PP_OK) { 379 if (err != PP_OK) {
244 mount_->ppapi()->ReleaseResource(local_addr_); 380 mount_->ppapi()->ReleaseResource(local_addr_);
245 local_addr_ = 0; 381 local_addr_ = 0;
246 return PPErrorToErrno(err); 382 return PPErrorToErrno(err);
247 } 383 }
248 384
249 return 0; 385 return 0;
250 } 386 }
251 387
252 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { 388 Error MountNodeTCP::Connect(const HandleAttr& attr,
253 AUTO_LOCK(node_lock_); 389 const struct sockaddr* addr,
390 socklen_t len) {
391 EventListenerLock wait(GetEventEmitter());
254 392
255 if (remote_addr_ != 0) 393 if (TestStreamFlags(SSF_CONNECTING))
394 return EALREADY;
395
396 if (remote_addr_ != 0) {
256 return EISCONN; 397 return EISCONN;
398 }
257 399
258 remote_addr_ = SockAddrToResource(addr, len); 400 remote_addr_ = SockAddrToResource(addr, len);
259 if (0 == remote_addr_) 401 if (0 == remote_addr_)
260 return EINVAL; 402 return EINVAL;
261 403
262 int err = TCPInterface()->Connect(socket_resource_, 404 int ms = attr.IsBlocking() ? -1 : 0;
263 remote_addr_, 405
264 PP_BlockUntilComplete()); 406 SetStreamFlags(SSF_CONNECTING);
407 QueueConnect();
408
409 Error err = wait.WaitOnEvent(POLLOUT, ms);
410 if (ETIMEDOUT == err)
411 return EINPROGRESS;
265 412
266 // If we fail, release the dest addr resource 413 // If we fail, release the dest addr resource
267 if (err != PP_OK) { 414 if (err != 0) {
268 mount_->ppapi()->ReleaseResource(remote_addr_); 415 ConnectFailed_Locked();
269 remote_addr_ = 0; 416 return err;
270 return PPErrorToErrno(err);
271 } 417 }
272 418
273 ConnectDone(); 419 ConnectDone_Locked();
274 return 0; 420 return 0;
275 } 421 }
276 422
277 void MountNodeTCP::ConnectDone() { 423 void MountNodeTCP::ConnectDone_Locked() {
278 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); 424 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
279 425
280 // Now that we are connected, we can start sending and receiving. 426 // Now that we are connected, we can start sending and receiving.
427 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
281 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); 428 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
282 429
430 emitter_->ConnectDone_Locked();
431
283 // Begin the input pump 432 // Begin the input pump
284 QueueInput(); 433 QueueInput();
285 } 434 }
286 435
436 void MountNodeTCP::ConnectFailed_Locked() {
437 mount_->ppapi()->ReleaseResource(remote_addr_);
438 remote_addr_ = 0;
439 }
440
287 Error MountNodeTCP::Listen(int backlog) { 441 Error MountNodeTCP::Listen(int backlog) {
442 AUTO_LOCK(node_lock_);
443 if (0 == local_addr_)
444 return EINVAL;
445
288 int err = TCPInterface()->Listen(socket_resource_, 446 int err = TCPInterface()->Listen(socket_resource_,
289 backlog, 447 backlog,
290 PP_BlockUntilComplete()); 448 PP_BlockUntilComplete());
291 if (err != PP_OK) 449 if (err != PP_OK)
292 return PPErrorToErrno(err); 450 return PPErrorToErrno(err);
293 451
452 ClearStreamFlags(SSF_CAN_CONNECT);
453 SetStreamFlags(SSF_LISTENING);
454 QueueAccept();
294 return 0; 455 return 0;
295 } 456 }
296 457
297 Error MountNodeTCP::Recv_Locked(void* buf, 458 Error MountNodeTCP::Recv_Locked(void* buf,
298 size_t len, 459 size_t len,
299 PP_Resource* out_addr, 460 PP_Resource* out_addr,
300 int* out_len) { 461 int* out_len) {
462 assert(emitter_.get());
301 *out_len = emitter_->ReadIn_Locked((char*)buf, len); 463 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
302 *out_addr = remote_addr_; 464 *out_addr = remote_addr_;
303 465
304 // Ref the address copy we pass back. 466 // Ref the address copy we pass back.
305 mount_->ppapi()->AddRefResource(remote_addr_); 467 mount_->ppapi()->AddRefResource(remote_addr_);
306 return 0; 468 return 0;
307 } 469 }
308 470
309 // TCP ignores dst addr passed to send_to, and always uses bound address 471 // TCP ignores dst addr passed to send_to, and always uses bound address
310 Error MountNodeTCP::Send_Locked(const void* buf, 472 Error MountNodeTCP::Send_Locked(const void* buf,
311 size_t len, 473 size_t len,
312 PP_Resource, 474 PP_Resource,
313 int* out_len) { 475 int* out_len) {
476 assert(emitter_.get());
314 *out_len = emitter_->WriteOut_Locked((char*)buf, len); 477 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
315 return 0; 478 return 0;
316 } 479 }
317 480
318 481
319 } // namespace nacl_io 482 } // namespace nacl_io
320 483
321 #endif // PROVIDES_SOCKET_API 484 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698