Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(120)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 26703008: [NaCl SDK] nacl_io: Add support for non-blocking connect/accept (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/ossocket.h"
7 #ifdef PROVIDES_SOCKET_API 7 #ifdef PROVIDES_SOCKET_API
8 8
9 #include <assert.h>
9 #include <errno.h> 10 #include <errno.h>
10 #include <string.h> 11 #include <string.h>
11 #include <algorithm> 12 #include <algorithm>
12 13
14 #include "nacl_io/dbgprint.h"
15 #include "nacl_io/kernel_handle.h"
13 #include "nacl_io/mount_node_tcp.h" 16 #include "nacl_io/mount_node_tcp.h"
14 #include "nacl_io/mount_stream.h" 17 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/pepper_interface.h" 18 #include "nacl_io/pepper_interface.h"
16 19
17 namespace { 20 namespace {
18 const size_t kMaxPacketSize = 65536; 21 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8; 22 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 } 23 }
21 24
22 namespace nacl_io { 25 namespace nacl_io {
(...skipping 27 matching lines...) Expand all
50 virtual bool Start(int32_t val) { 53 virtual bool Start(int32_t val) {
51 AUTO_LOCK(emitter_->GetLock()); 54 AUTO_LOCK(emitter_->GetLock());
52 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 55 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
53 56
54 // Does the stream exist, and can it send? 57 // Does the stream exist, and can it send?
55 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND)) 58 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
56 return false; 59 return false;
57 60
58 // If not currently sending... 61 // If not currently sending...
59 if (!stream->TestStreamFlags(SSF_SENDING)) { 62 if (!stream->TestStreamFlags(SSF_SENDING)) {
60 size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable(); 63 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61 int capped_len = std::min(tx_data_avail, kMaxPacketSize); 64 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
62 65
63 if (capped_len == 0) 66 if (capped_len == 0)
64 return false; 67 return false;
65 68
66 data_ = new char[capped_len]; 69 data_ = new char[capped_len];
67 emitter_->ReadOut_Locked(data_, capped_len); 70 emitter_->ReadOut_Locked(data_, capped_len);
68 71
69 stream->SetStreamFlags(SSF_SENDING); 72 stream->SetStreamFlags(SSF_SENDING);
70 int err = TCPInterface()->Write(stream->socket_resource(), 73 int err = TCPInterface()->Write(stream->socket_resource(),
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
106 virtual bool Start(int32_t val) { 109 virtual bool Start(int32_t val) {
107 AUTO_LOCK(emitter_->GetLock()); 110 AUTO_LOCK(emitter_->GetLock());
108 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 111 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
109 112
110 // Does the stream exist, and can it recv? 113 // Does the stream exist, and can it recv?
111 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) 114 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
112 return false; 115 return false;
113 116
114 // If we are not currently receiving 117 // If we are not currently receiving
115 if (!stream->TestStreamFlags(SSF_RECVING)) { 118 if (!stream->TestStreamFlags(SSF_RECVING)) {
116 size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable(); 119 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
117 int capped_len = 120 int capped_len =
118 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); 121 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
119 122
120 if (capped_len == 0) 123 if (capped_len == 0)
121 return false; 124 return false;
122 125
123 stream->SetStreamFlags(SSF_RECVING); 126 stream->SetStreamFlags(SSF_RECVING);
124 data_ = new char[capped_len]; 127 data_ = new char[capped_len];
125 int err = TCPInterface()->Read(stream->socket_resource(), 128 int err = TCPInterface()->Read(stream->socket_resource(),
126 data_, 129 data_,
(...skipping 18 matching lines...) Expand all
145 emitter_->WriteIn_Locked(data_, length_error); 148 emitter_->WriteIn_Locked(data_, length_error);
146 stream->ClearStreamFlags(SSF_RECVING); 149 stream->ClearStreamFlags(SSF_RECVING);
147 stream->QueueInput(); 150 stream->QueueInput();
148 } else { 151 } else {
149 stream->SetError_Locked(length_error); 152 stream->SetError_Locked(length_error);
150 } 153 }
151 } 154 }
152 } 155 }
153 }; 156 };
154 157
158 class TCPAcceptWork : public MountStream::Work {
159 public:
160 explicit TCPAcceptWork(MountStream* stream,
161 const ScopedEventEmitterTCP& emitter)
162 : MountStream::Work(stream),
163 emitter_(emitter) {}
164
165 TCPSocketInterface* TCPInterface() {
166 return mount()->ppapi()->GetTCPSocketInterface();
167 }
168
169 virtual bool Start(int32_t val) {
170 AUTO_LOCK(emitter_->GetLock());
171 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
172
173 // Does the stream exist, and can it accept?
174 if (NULL == node)
175 return false;
176
177 // If we are not currently accepting
178 if (!node->TestStreamFlags(SSF_LISTENING))
179 return false;
180
181 int err = TCPInterface()->Accept(node->socket_resource(),
182 &new_socket_,
183 mount()->GetRunCompletion(this));
184
185 if (err != PP_OK_COMPLETIONPENDING)
186 // Anything else, we should assume the socket has gone bad.
187 node->SetError_Locked(err);
188
189 return true;
190 }
191
192 virtual void Run(int32_t error) {
193 AUTO_LOCK(emitter_->GetLock());
194 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
195
196 //dbgprintf("accept completed\n");
197
198 if (node == NULL)
199 return;
200
201 if (error != PP_OK) {
202 //dbgprintf("accept error\n");
203 node->SetError_Locked(error);
204 return;
205 }
206
207 emitter_->SetAcceptedSocket_Locked(new_socket_);
208 }
209
210 protected:
211 PP_Resource new_socket_;
212 ScopedEventEmitterTCP emitter_;
213 };
214
215 class TCPConnectWork : public MountStream::Work {
216 public:
217 explicit TCPConnectWork(MountStream* stream,
218 const ScopedEventEmitterTCP& emitter)
219 : MountStream::Work(stream),
220 emitter_(emitter) {}
221
222 TCPSocketInterface* TCPInterface() {
223 return mount()->ppapi()->GetTCPSocketInterface();
224 }
225
226 virtual bool Start(int32_t val) {
227 //dbgprintf("connect start 1\n");
228 AUTO_LOCK(emitter_->GetLock());
229 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
230
231 // Does the stream exist, and can it connect?
232 if (NULL == node)
233 return false;
234
235 //dbgprintf("connect start 2\n");
236 int err = TCPInterface()->Connect(node->socket_resource(),
237 node->remote_addr(),
238 mount()->GetRunCompletion(this));
239
240 if (err != PP_OK_COMPLETIONPENDING)
241 // Anything else, we should assume the socket has gone bad.
242 node->SetError_Locked(err);
243
244 return true;
245 }
246
247 virtual void Run(int32_t error) {
248 AUTO_LOCK(emitter_->GetLock());
249 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
250
251 //dbgprintf("connect completed\n");
252 if (node == NULL)
253 return;
254
255 if (error != PP_OK) {
256 //dbgprintf("connect error\n");
257 node->ConnectFailed_Locked();
258 node->SetError_Locked(error);
259 return;
260 }
261
262 node->ConnectDone_Locked();
263 }
264
265 protected:
266 ScopedEventEmitterTCP emitter_;
267 };
268
155 MountNodeTCP::MountNodeTCP(Mount* mount) 269 MountNodeTCP::MountNodeTCP(Mount* mount)
156 : MountNodeSocket(mount), 270 : MountNodeSocket(mount),
157 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { 271 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
158 emitter_->AttachStream(this); 272 emitter_->AttachStream(this);
159 } 273 }
160 274
161 MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket) 275 MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket)
162 : MountNodeSocket(mount, socket), 276 : MountNodeSocket(mount, socket),
163 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) { 277 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
164 emitter_->AttachStream(this); 278 emitter_->AttachStream(this);
165 } 279 }
166 280
167 void MountNodeTCP::Destroy() { 281 void MountNodeTCP::Destroy() {
168 emitter_->DetachStream(); 282 emitter_->DetachStream();
169 MountNodeSocket::Destroy(); 283 MountNodeSocket::Destroy();
170 } 284 }
171 285
172 Error MountNodeTCP::Init(int open_flags) { 286 Error MountNodeTCP::Init(int open_flags) {
173 Error err = MountNodeSocket::Init(open_flags); 287 Error err = MountNodeSocket::Init(open_flags);
174 if (err != 0) 288 if (err != 0)
175 return err; 289 return err;
176 290
177 if (TCPInterface() == NULL) 291 if (TCPInterface() == NULL)
178 return EACCES; 292 return EACCES;
179 293
294 SetStreamFlags(SSF_CAN_CONNECT);
295
180 if (socket_resource_ != 0) { 296 if (socket_resource_ != 0) {
181 // TCP sockets that are contructed with an existing socket_resource_ 297 // TCP sockets that are contructed with an existing socket_resource_
182 // are those that generated from calls to Accept() and therefore are 298 // are those that generated from calls to Accept() and therefore are
183 // already connected. 299 // already connected.
184 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_); 300 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
185 ConnectDone(); 301 ConnectDone_Locked();
186 } else { 302 } else {
187 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); 303 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
188 if (0 == socket_resource_) 304 if (0 == socket_resource_)
189 return EACCES; 305 return EACCES;
190 } 306 }
191 307
192 return 0; 308 return 0;
193 } 309 }
194 310
195 EventEmitterTCP* MountNodeTCP::GetEventEmitter() { 311 EventEmitter* MountNodeTCP::GetEventEmitter() {
196 return emitter_.get(); 312 return emitter_.get();
197 } 313 }
198 314
315 void MountNodeTCP::QueueAccept() {
316 MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_);
317 mount_stream()->EnqueueWork(work);
318 }
319
320 void MountNodeTCP::QueueConnect() {
321 MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_);
322 mount_stream()->EnqueueWork(work);
323 }
324
199 void MountNodeTCP::QueueInput() { 325 void MountNodeTCP::QueueInput() {
200 TCPRecvWork* work = new TCPRecvWork(emitter_); 326 MountStream::Work* work = new TCPRecvWork(emitter_);
201 mount_stream()->EnqueueWork(work); 327 mount_stream()->EnqueueWork(work);
202 } 328 }
203 329
204 void MountNodeTCP::QueueOutput() { 330 void MountNodeTCP::QueueOutput() {
205 TCPSendWork* work = new TCPSendWork(emitter_); 331 MountStream::Work* work = new TCPSendWork(emitter_);
206 mount_stream()->EnqueueWork(work); 332 mount_stream()->EnqueueWork(work);
207 } 333 }
208 334
209 Error MountNodeTCP::Accept(PP_Resource* out_sock, 335 Error MountNodeTCP::Accept(const HandleAttr& attr,
336 PP_Resource* out_sock,
210 struct sockaddr* addr, 337 struct sockaddr* addr,
211 socklen_t* len) { 338 socklen_t* len) {
212 AUTO_LOCK(node_lock_); 339 EventListenerLock wait(GetEventEmitter());
213 int err = TCPInterface()->Accept(socket_resource_,
214 out_sock,
215 PP_BlockUntilComplete());
216 340
217 if (err != PP_OK) 341 if (!TestStreamFlags(SSF_LISTENING))
218 return PPErrorToErrno(err); 342 return EINVAL;
219 343
344 // Either block forever or not at all
345 int ms = attr.IsBlocking() ? -1 : 0;
346
347 //dbgprintf("Accept: Waiting: %d\n", ms);
348 Error err = wait.WaitOnEvent(POLLIN, ms);
349 //dbgprintf("Accept: wait done: %d\n", (int)err);
350
351 if (ETIMEDOUT == err)
352 return EWOULDBLOCK;
353
354 int s = emitter_->GetAcceptedSocket_Locked();
355 // Non-blocking case.
356 if (s == 0)
357 return EAGAIN;
358
359 // Consume the new socket and start listening for the next one
360 *out_sock = s;
361 emitter_->ClearEvents_Locked(POLLIN);
362
363 // Set the out paramaters
220 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock); 364 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
221 *len = ResourceToSockAddr(remote_addr, *len, addr); 365 *len = ResourceToSockAddr(remote_addr, *len, addr);
222 mount_->ppapi()->ReleaseResource(remote_addr); 366 mount_->ppapi()->ReleaseResource(remote_addr);
367
368 QueueAccept();
223 return 0; 369 return 0;
224 } 370 }
225 371
226 // We can not bind a client socket with PPAPI. For now we ignore the 372 // We can not bind a client socket with PPAPI. For now we ignore the
227 // bind but report the correct address later, just in case someone is 373 // bind but report the correct address later, just in case someone is
228 // binding without really caring what the address is (for example to 374 // binding without really caring what the address is (for example to
229 // select a more optimized interface/route.) 375 // select a more optimized interface/route.)
230 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 376 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
231 AUTO_LOCK(node_lock_); 377 AUTO_LOCK(node_lock_);
232 378
233 /* Only bind once. */ 379 /* Only bind once. */
234 if (local_addr_ != 0) 380 if (local_addr_ != 0)
235 return EINVAL; 381 return EINVAL;
236 382
237 local_addr_ = SockAddrToResource(addr, len); 383 local_addr_ = SockAddrToResource(addr, len);
238 int err = TCPInterface()->Bind(socket_resource_, 384 int err = TCPInterface()->Bind(socket_resource_,
239 local_addr_, 385 local_addr_,
240 PP_BlockUntilComplete()); 386 PP_BlockUntilComplete());
241 387
242 // If we fail, release the local addr resource 388 // If we fail, release the local addr resource
243 if (err != PP_OK) { 389 if (err != PP_OK) {
244 mount_->ppapi()->ReleaseResource(local_addr_); 390 mount_->ppapi()->ReleaseResource(local_addr_);
245 local_addr_ = 0; 391 local_addr_ = 0;
392 //dbgprintf("Bind error: %d\n", err);
246 return PPErrorToErrno(err); 393 return PPErrorToErrno(err);
247 } 394 }
248 395
249 return 0; 396 return 0;
250 } 397 }
251 398
252 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { 399 Error MountNodeTCP::Connect(const HandleAttr& attr,
253 AUTO_LOCK(node_lock_); 400 const struct sockaddr* addr,
401 socklen_t len) {
402 EventListenerLock wait(GetEventEmitter());
403 //dbgprintf("Connect\n");
254 404
255 if (remote_addr_ != 0) 405 if (TestStreamFlags(SSF_CONNECTING))
406 return EALREADY;
407
408 if (remote_addr_ != 0) {
409 //dbgprintf("Already connected: %d\n", EISCONN);
256 return EISCONN; 410 return EISCONN;
411 }
257 412
258 remote_addr_ = SockAddrToResource(addr, len); 413 remote_addr_ = SockAddrToResource(addr, len);
259 if (0 == remote_addr_) 414 if (0 == remote_addr_)
260 return EINVAL; 415 return EINVAL;
261 416
262 int err = TCPInterface()->Connect(socket_resource_, 417 //dbgprintf("Connect blocking %d flags=%d\n", attr.IsBlocking(), attr.flags);
263 remote_addr_, 418 int ms = attr.IsBlocking() ? -1 : 0;
264 PP_BlockUntilComplete()); 419
420 SetStreamFlags(SSF_CONNECTING);
421 QueueConnect();
422
423 //dbgprintf("Waiting on connect: %d\n", ms);
424 Error err = wait.WaitOnEvent(POLLOUT, ms);
425 //dbgprintf("connect: done waiting: %d\n", (int)err);
426
427 if (ETIMEDOUT == err)
428 return EINPROGRESS;
265 429
266 // If we fail, release the dest addr resource 430 // If we fail, release the dest addr resource
267 if (err != PP_OK) { 431 if (err != 0) {
268 mount_->ppapi()->ReleaseResource(remote_addr_); 432 ConnectFailed_Locked();
269 remote_addr_ = 0; 433 return err;
270 return PPErrorToErrno(err);
271 } 434 }
272 435
273 ConnectDone(); 436 ConnectDone_Locked();
274 return 0; 437 return 0;
275 } 438 }
276 439
277 void MountNodeTCP::ConnectDone() { 440 void MountNodeTCP::ConnectDone_Locked() {
278 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); 441 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
279 442
280 // Now that we are connected, we can start sending and receiving. 443 // Now that we are connected, we can start sending and receiving.
444 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
281 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); 445 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
282 446
447 emitter_->ConnectDone();
448
283 // Begin the input pump 449 // Begin the input pump
284 QueueInput(); 450 QueueInput();
285 } 451 }
286 452
453 void MountNodeTCP::ConnectFailed_Locked() {
454 mount_->ppapi()->ReleaseResource(remote_addr_);
455 remote_addr_ = 0;
456 }
457
287 Error MountNodeTCP::Listen(int backlog) { 458 Error MountNodeTCP::Listen(int backlog) {
459 AUTO_LOCK(node_lock_);
460 if (0 == local_addr_)
461 return EINVAL;
462
288 int err = TCPInterface()->Listen(socket_resource_, 463 int err = TCPInterface()->Listen(socket_resource_,
289 backlog, 464 backlog,
290 PP_BlockUntilComplete()); 465 PP_BlockUntilComplete());
291 if (err != PP_OK) 466 if (err != PP_OK)
292 return PPErrorToErrno(err); 467 return PPErrorToErrno(err);
293 468
469 //dbgprintf("listening\n");
470 ClearStreamFlags(SSF_CAN_CONNECT);
471 SetStreamFlags(SSF_LISTENING);
472 QueueAccept();
294 return 0; 473 return 0;
295 } 474 }
296 475
297 Error MountNodeTCP::Recv_Locked(void* buf, 476 Error MountNodeTCP::Recv_Locked(void* buf,
298 size_t len, 477 size_t len,
299 PP_Resource* out_addr, 478 PP_Resource* out_addr,
300 int* out_len) { 479 int* out_len) {
480 assert(emitter_.get());
301 *out_len = emitter_->ReadIn_Locked((char*)buf, len); 481 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
302 *out_addr = remote_addr_; 482 *out_addr = remote_addr_;
303 483
304 // Ref the address copy we pass back. 484 // Ref the address copy we pass back.
305 mount_->ppapi()->AddRefResource(remote_addr_); 485 mount_->ppapi()->AddRefResource(remote_addr_);
306 return 0; 486 return 0;
307 } 487 }
308 488
309 // TCP ignores dst addr passed to send_to, and always uses bound address 489 // TCP ignores dst addr passed to send_to, and always uses bound address
310 Error MountNodeTCP::Send_Locked(const void* buf, 490 Error MountNodeTCP::Send_Locked(const void* buf,
311 size_t len, 491 size_t len,
312 PP_Resource, 492 PP_Resource,
313 int* out_len) { 493 int* out_len) {
494 assert(emitter_.get());
314 *out_len = emitter_->WriteOut_Locked((char*)buf, len); 495 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
315 return 0; 496 return 0;
316 } 497 }
317 498
318 499
319 } // namespace nacl_io 500 } // namespace nacl_io
320 501
321 #endif // PROVIDES_SOCKET_API 502 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698