Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/ossocket.h"
7 #ifdef PROVIDES_SOCKET_API 7 #ifdef PROVIDES_SOCKET_API
8 8
9 #include <errno.h> 9 #include <errno.h>
10 #include <string.h> 10 #include <string.h>
11 #include <algorithm> 11 #include <algorithm>
12 12
13 #include "nacl_io/mount.h"
14 #include "nacl_io/mount_node_socket.h"
15 #include "nacl_io/mount_node_tcp.h" 13 #include "nacl_io/mount_node_tcp.h"
14 #include "nacl_io/mount_stream.h"
16 #include "nacl_io/pepper_interface.h" 15 #include "nacl_io/pepper_interface.h"
17 16
17 namespace {
18 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 }
21
18 namespace nacl_io { 22 namespace nacl_io {
19 23
20 MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} 24 class TCPWork : public MountStream::Work {
25 public:
26 explicit TCPWork(const ScopedEventEmitterTCP& emitter)
27 : MountStream::Work(emitter->stream()->mount_stream()),
28 emitter_(emitter),
29 data_(NULL) {
30 }
31
32 ~TCPWork() {
33 delete[] data_;
34 }
35
36 TCPSocketInterface* TCPInterface() {
37 return mount()->ppapi()->GetTCPSocketInterface();
38 }
39
40 protected:
41 ScopedEventEmitterTCP emitter_;
42 char* data_;
43 };
21 44
22 45
23 TCPSocketInterface* MountNodeTCP::TCPSocket() { 46 class TCPSendWork : public TCPWork {
24 if (mount_->ppapi() == NULL) 47 public:
25 return NULL; 48 explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
49 : TCPWork(emitter) {}
26 50
27 return mount_->ppapi()->GetTCPSocketInterface(); 51 virtual bool Start(int32_t val) {
52 AUTO_LOCK(emitter_->GetLock());
53 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
54
55 // If the stream is valid, and we are not currently sending
56 if (stream && ((stream->GetStreamFlags() & SSF_SENDING) == 0)) {
57 size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
58 int capped_len =
59 static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
60
61 if (capped_len == 0)
62 return false;
63
64 data_ = new char[capped_len];
65 emitter_->ReadOut_Locked(data_, capped_len);
66
67 stream->SetStreamFlags(SSF_SENDING);
68 int err = TCPInterface()->Write(stream->socket_resource(),
69 data_,
70 capped_len,
71 mount()->GetRunCompletion(this));
72 if (err == PP_OK_COMPLETIONPENDING)
73 return true;
74
75 stream->ClearStreamFlags(SSF_SENDING);
76 }
77 return false;
78 }
79
80 virtual void Run(int32_t length_error) {
81 AUTO_LOCK(emitter_->GetLock());
82 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
83
84 // If the stream is still there, see if we can queue more output
85 if (stream) {
86 stream->ClearStreamFlags(SSF_SENDING);
87 stream->QueueOutput();
88 }
89 }
90 };
91
92 class TCPRecvWork : public TCPWork {
93 public:
94 explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
95 : TCPWork(emitter) {}
96
97 virtual bool Start(int32_t val) {
98 AUTO_LOCK(emitter_->GetLock());
99 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
100
101 size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
102 int capped_len =
103 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
104
105 if (NULL == stream || (stream->GetStreamFlags() & SSF_RECVING) == 0)
106 return false;
107
108 if (capped_len <= 0)
109 return false;
110
111 stream->SetStreamFlags(SSF_RECVING);
112 data_ = new char[capped_len];
113 int err = TCPInterface()->Read(stream->socket_resource(),
114 data_,
115 capped_len,
116 mount()->GetRunCompletion(this));
117 if (err == PP_OK_COMPLETIONPENDING)
118 return true;
119
120 stream->ClearStreamFlags(SSF_RECVING);
121 return false;
122 }
123
124 virtual void Run(int32_t length_error) {
125 AUTO_LOCK(emitter_->GetLock());
126 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
127
128 if (NULL == stream)
129 return;
130
131 // If the stream is still there, see if we can queue more input
132 if (length_error > 0)
133 emitter_->WriteIn_Locked(data_, length_error);
134
135 stream->ClearStreamFlags(SSF_RECVING);
136 stream->QueueInput();
137 }
138 };
139
140
141 MountNodeTCP::MountNodeTCP(Mount* mount)
142 : MountNodeSocket(mount) {
28 } 143 }
29 144
30 Error MountNodeTCP::Init(int flags) { 145 Error MountNodeTCP::Init(int flags) {
31 if (TCPSocket() == NULL) 146 if (TCPInterface() == NULL)
32 return EACCES; 147 return EACCES;
33 148
34 socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); 149 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 150 if (0 == socket_resource_)
36 return EACCES; 151 return EACCES;
37 152
38 return 0; 153 return 0;
39 } 154 }
40 155
156 EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
157 return emitter_.get();
158 }
159
160 void MountNodeTCP::QueueInput() {
161 TCPRecvWork* work = new TCPRecvWork(emitter_);
162 mount_stream()->EnqueueWork(work);
163 }
164
165 void MountNodeTCP::QueueOutput() {
166 TCPSendWork* work = new TCPSendWork(emitter_);
167 mount_stream()->EnqueueWork(work);
168 }
169
170
171 // We can not bind a client socket with PPAPI. For now we ignore the
172 // bind but report the correct address later, just in case someone is
173 // binding without really caring what the address is (for example to
174 // select a more optimized interface/route.)
41 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 175 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
42 AUTO_LOCK(node_lock_); 176 AUTO_LOCK(node_lock_);
43 177
44 if (0 == socket_resource_) 178 if (0 == socket_resource_)
45 return EBADF; 179 return EBADF;
46 180
47 /* Only bind once. */ 181 /* Only bind once. */
48 if (local_addr_ != 0) 182 if (local_addr_ != 0)
49 return EINVAL; 183 return EINVAL;
50 184
51 /* Lie, we won't know until we connect. */ 185 /* Lie, we won't know until we connect. */
52 return 0; 186 return 0;
53 } 187 }
54 188
55 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { 189 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
56 AUTO_LOCK(node_lock_); 190 AUTO_LOCK(node_lock_);
57 191
58 if (0 == socket_resource_) 192 if (0 == socket_resource_)
59 return EBADF; 193 return EBADF;
60 194
61 if (remote_addr_ != 0) 195 if (remote_addr_ != 0)
62 return EISCONN; 196 return EISCONN;
63 197
64 remote_addr_ = SockAddrToResource(addr, len); 198 remote_addr_ = SockAddrToResource(addr, len);
65 if (0 == remote_addr_) 199 if (0 == remote_addr_)
66 return EINVAL; 200 return EINVAL;
67 201
68 int err = TCPSocket()->Connect(socket_resource_, 202 int err = TCPInterface()->Connect(socket_resource_,
69 remote_addr_, 203 remote_addr_,
70 PP_BlockUntilComplete()); 204 PP_BlockUntilComplete());
71 205
72 // If we fail, release the dest addr resource 206 // If we fail, release the dest addr resource
73 if (err != PP_OK) { 207 if (err != PP_OK) {
74 mount_->ppapi()->ReleaseResource(remote_addr_); 208 mount_->ppapi()->ReleaseResource(remote_addr_);
75 remote_addr_ = 0; 209 remote_addr_ = 0;
76 return PPErrorToErrno(err); 210 return PPErrorToErrno(err);
77 } 211 }
78 212
79 local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); 213 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
80 mount_->ppapi()->AddRefResource(local_addr_); 214 mount_->ppapi()->AddRefResource(local_addr_);
81 return 0; 215 return 0;
82 } 216 }
83 217
84 Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) { 218 Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
85 AUTO_LOCK(node_lock_); 219 AUTO_LOCK(node_lock_);
86 if (0 == socket_resource_) 220 if (0 == socket_resource_)
87 return EBADF; 221 return EBADF;
88 222
89 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 223 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
90 int err = TCPSocket()->Read(socket_resource_, 224 int err = TCPInterface()->Read(socket_resource_,
91 static_cast<char*>(buf), 225 static_cast<char*>(buf),
92 capped_len, 226 capped_len,
93 PP_BlockUntilComplete()); 227 PP_BlockUntilComplete());
94 if (err < 0) 228 if (err < 0)
95 return PPErrorToErrno(err); 229 return PPErrorToErrno(err);
96 230
97 *out_len = err; 231 *out_len = err;
98 return 0; 232 return 0;
99 } 233 }
100 234
101 Error MountNodeTCP::RecvFrom(void* buf, 235 Error MountNodeTCP::RecvFrom(void* buf,
102 size_t len, 236 size_t len,
103 int flags, 237 int flags,
(...skipping 10 matching lines...) Expand all
114 Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) { 248 Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
115 AUTO_LOCK(node_lock_); 249 AUTO_LOCK(node_lock_);
116 250
117 if (0 == socket_resource_) 251 if (0 == socket_resource_)
118 return EBADF; 252 return EBADF;
119 253
120 if (0 == remote_addr_) 254 if (0 == remote_addr_)
121 return ENOTCONN; 255 return ENOTCONN;
122 256
123 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 257 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
124 int err = TCPSocket()->Write(socket_resource_, 258 int err = TCPInterface()->Write(socket_resource_,
125 static_cast<const char*>(buf), 259 static_cast<const char*>(buf),
126 capped_len, 260 capped_len,
127 PP_BlockUntilComplete()); 261 PP_BlockUntilComplete());
128 if (err < 0) 262 if (err < 0)
129 return PPErrorToErrno(err); 263 return PPErrorToErrno(err);
130 264
131 *out_len = err; 265 *out_len = err;
132 return 0; 266 return 0;
133 } 267 }
134 268
135 Error MountNodeTCP::SendTo(const void* buf, 269 Error MountNodeTCP::SendTo(const void* buf,
136 size_t len, 270 size_t len,
137 int flags, 271 int flags,
138 const struct sockaddr* dest_addr, 272 const struct sockaddr* dest_addr,
139 socklen_t addrlen, 273 socklen_t addrlen,
140 int* out_len) { 274 int* out_len) {
141 return Send(buf, len, flags, out_len); 275 return Send(buf, len, flags, out_len);
142 } 276 }
143 277
144 } // namespace nacl_io 278 } // namespace nacl_io
145 279
146 #endif // PROVIDES_SOCKET_API 280 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698