Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(462)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fixed issues, UDP ok, dbing TCP Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/ossocket.h"
7 #ifdef PROVIDES_SOCKET_API 7 #ifdef PROVIDES_SOCKET_API
8 8
9 #include <errno.h> 9 #include <errno.h>
10 #include <string.h> 10 #include <string.h>
11 #include <algorithm> 11 #include <algorithm>
12 12
13 #include "nacl_io/mount.h"
14 #include "nacl_io/mount_node_socket.h"
15 #include "nacl_io/mount_node_tcp.h" 13 #include "nacl_io/mount_node_tcp.h"
14 #include "nacl_io/mount_stream.h"
16 #include "nacl_io/pepper_interface.h" 15 #include "nacl_io/pepper_interface.h"
17 16
17 namespace {
18 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 }
21
18 namespace nacl_io { 22 namespace nacl_io {
19 23
20 MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} 24 class TCPWork : public MountStream::Work {
25 public:
26 explicit TCPWork(const ScopedEventEmitterTCP& emitter)
27 : MountStream::Work(emitter->stream()->mount_stream()),
28 emitter_(emitter),
29 data_(NULL) {
30 }
31
32 ~TCPWork() {
33 delete[] data_;
34 }
35
36 TCPSocketInterface* TCPInterface() {
37 return mount()->ppapi()->GetTCPSocketInterface();
38 }
39
40 protected:
41 ScopedEventEmitterTCP emitter_;
42 char* data_;
43 };
21 44
22 45
23 TCPSocketInterface* MountNodeTCP::TCPSocket() { 46 class TCPSendWork : public TCPWork {
24 if (mount_->ppapi() == NULL) 47 public:
25 return NULL; 48 explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
49 : TCPWork(emitter) {}
26 50
27 return mount_->ppapi()->GetTCPSocketInterface(); 51 virtual bool Start(int32_t val) {
52 AUTO_LOCK(emitter_->GetLock());
53 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
54
55 // Does the stream exist, and can it send?
56 if (NULL == stream)
57 return false;
58
59 bool wth = stream->TestStreamFlags(SSF_CAN_SEND);
binji 2013/09/19 22:40:25 wth?
noelallen1 2013/09/20 00:51:27 Still debugging TCP. I take it you didn't notice t
60 if (!wth)
61 return false;
62
63 // If not currently sending...
64 if (!stream->TestStreamFlags(SSF_SENDING)) {
65 size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
66 int capped_len =
67 static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
68
69 if (capped_len == 0)
70 return false;
71
72 data_ = new char[capped_len];
73 emitter_->ReadOut_Locked(data_, capped_len);
74
75 stream->SetStreamFlags(SSF_SENDING);
76 int err = TCPInterface()->Write(stream->socket_resource(),
77 data_,
78 capped_len,
79 mount()->GetRunCompletion(this));
80 if (err == PP_OK_COMPLETIONPENDING)
81 return true;
82
83 // Anything else, we should assume the socket has gone bad.
84 stream->SetError(err);
85 }
86 return false;
87 }
88
89 virtual void Run(int32_t length_error) {
90 AUTO_LOCK(emitter_->GetLock());
91 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
92
93 // If the stream is still there...
94 if (stream) {
95 // And we did send, then Q more work.
96 if (length_error >= 0) {
97 stream->ClearStreamFlags(SSF_SENDING);
98 stream->QueueOutput();
binji 2013/09/19 22:40:25 why does TCPSendWork QueueOutput in the Run callba
noelallen1 2013/09/20 00:51:27 MountNodeSocket handles the foreground thread side
99 } else {
100 // Otherwise this socket has gone bad.
101 stream->SetError(length_error);
102 }
103 }
104 }
105 };
106
107 class TCPRecvWork : public TCPWork {
108 public:
109 explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
110 : TCPWork(emitter) {}
111
112 virtual bool Start(int32_t val) {
113 AUTO_LOCK(emitter_->GetLock());
114 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
115
116 // Does the stream exist, and can it recv?
117 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
118 return false;
119
120 // If the stream is valid and we are not currently receiving
binji 2013/09/19 22:40:25 bad comment, just "if not currently receiving"
noelallen1 2013/09/20 00:51:27 Done.
121 if (!stream->TestStreamFlags(SSF_RECVING)) {
122 size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
123 int capped_len =
124 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
125
126 if (capped_len > 0) {
binji 2013/09/19 22:40:25 why did you flip this if? (TCPSendWork uses an if-
noelallen1 2013/09/20 00:51:27 Done.
127 stream->SetStreamFlags(SSF_RECVING);
128 data_ = new char[capped_len];
129 int err = TCPInterface()->Read(stream->socket_resource(),
130 data_,
131 capped_len,
132 mount()->GetRunCompletion(this));
133 if (err == PP_OK_COMPLETIONPENDING)
134 return true;
135
136 stream->SetError(err);
137 }
138 }
139 return false;
140 }
141
142 virtual void Run(int32_t length_error) {
143 AUTO_LOCK(emitter_->GetLock());
144 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
145
146 // If the stream is still there, see if we can queue more input
147 if (stream) {
148 if (length_error > 0) {
149 emitter_->WriteIn_Locked(data_, length_error);
150 } else {
151 stream->SetError(length_error);
152 }
153 }
154 }
155 };
156
157
158 MountNodeTCP::MountNodeTCP(Mount* mount)
159 : MountNodeSocket(mount),
160 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
161 emitter_->AttachStream(this);
162 }
163
164 void MountNodeTCP::Destroy() {
165 emitter_->DetachStream();
166 MountNodeSocket::Destroy();
28 } 167 }
29 168
30 Error MountNodeTCP::Init(int flags) { 169 Error MountNodeTCP::Init(int flags) {
31 if (TCPSocket() == NULL) 170 if (TCPInterface() == NULL)
32 return EACCES; 171 return EACCES;
33 172
34 socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); 173 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 174 if (0 == socket_resource_)
36 return EACCES; 175 return EACCES;
37 176
38 return 0; 177 return 0;
39 } 178 }
40 179
180 EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
181 return emitter_.get();
182 }
183
184 void MountNodeTCP::QueueInput() {
185 TCPRecvWork* work = new TCPRecvWork(emitter_);
186 mount_stream()->EnqueueWork(work);
187 }
188
189 void MountNodeTCP::QueueOutput() {
190 TCPSendWork* work = new TCPSendWork(emitter_);
191 mount_stream()->EnqueueWork(work);
192 }
193
194
195 // We can not bind a client socket with PPAPI. For now we ignore the
196 // bind but report the correct address later, just in case someone is
197 // binding without really caring what the address is (for example to
198 // select a more optimized interface/route.)
41 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 199 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
42 AUTO_LOCK(node_lock_); 200 AUTO_LOCK(node_lock_);
43 201
44 if (0 == socket_resource_) 202 if (0 == socket_resource_)
45 return EBADF; 203 return EBADF;
46 204
47 /* Only bind once. */ 205 /* Only bind once. */
48 if (local_addr_ != 0) 206 if (local_addr_ != 0)
49 return EINVAL; 207 return EINVAL;
50 208
51 /* Lie, we won't know until we connect. */ 209 /* Lie, we won't know until we connect. */
52 return 0; 210 return 0;
53 } 211 }
54 212
55 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { 213 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
56 AUTO_LOCK(node_lock_); 214 AUTO_LOCK(node_lock_);
57 215
58 if (0 == socket_resource_) 216 if (0 == socket_resource_)
59 return EBADF; 217 return EBADF;
60 218
61 if (remote_addr_ != 0) 219 if (remote_addr_ != 0)
62 return EISCONN; 220 return EISCONN;
63 221
64 remote_addr_ = SockAddrToResource(addr, len); 222 remote_addr_ = SockAddrToResource(addr, len);
65 if (0 == remote_addr_) 223 if (0 == remote_addr_)
66 return EINVAL; 224 return EINVAL;
67 225
68 int err = TCPSocket()->Connect(socket_resource_, 226 int err = TCPInterface()->Connect(socket_resource_,
69 remote_addr_, 227 remote_addr_,
70 PP_BlockUntilComplete()); 228 PP_BlockUntilComplete());
71 229
72 // If we fail, release the dest addr resource 230 // If we fail, release the dest addr resource
73 if (err != PP_OK) { 231 if (err != PP_OK) {
74 mount_->ppapi()->ReleaseResource(remote_addr_); 232 mount_->ppapi()->ReleaseResource(remote_addr_);
75 remote_addr_ = 0; 233 remote_addr_ = 0;
76 return PPErrorToErrno(err); 234 return PPErrorToErrno(err);
77 } 235 }
78 236
79 local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); 237 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
80 mount_->ppapi()->AddRefResource(local_addr_); 238 mount_->ppapi()->AddRefResource(local_addr_);
239
240 // Now that we are connected, we can start sending and receiving.
241 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
242
243 // Begin the input pump
244 QueueInput();
81 return 0; 245 return 0;
82 } 246 }
83 247
84 Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
85 AUTO_LOCK(node_lock_);
86 if (0 == socket_resource_)
87 return EBADF;
88 248
89 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 249 Error MountNodeTCP::Recv_Locked(void* buf,
90 int err = TCPSocket()->Read(socket_resource_, 250 size_t len,
91 static_cast<char*>(buf), 251 PP_Resource* addr,
binji 2013/09/19 22:40:25 out_addr?
noelallen1 2013/09/20 00:51:27 Done.
92 capped_len, 252 int* out_len) {
93 PP_BlockUntilComplete()); 253 *out_len = emitter_->in_fifo()->Read(buf, len);
94 if (err < 0) 254 *addr = remote_addr_;
95 return PPErrorToErrno(err);
96 255
97 *out_len = err; 256 mount_->ppapi()->AddRefResource(remote_addr_);
98 return 0; 257 return 0;
99 } 258 }
100 259
101 Error MountNodeTCP::RecvFrom(void* buf, 260 // TCP ignores dst addr passed to send_to, and always uses bound address
102 size_t len, 261 Error MountNodeTCP::Send_Locked(const void* buf,
103 int flags, 262 size_t len,
104 struct sockaddr* src_addr, 263 PP_Resource,
105 socklen_t* addrlen, 264 int* out_len) {
106 int* out_len) { 265 *out_len = emitter_->in_fifo()->Write(buf, len);
107 Error err = Recv(buf, len, flags, out_len);
108 if (err == 0)
109 GetPeerName(src_addr, addrlen);
110 return err;
111 }
112
113
114 Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
115 AUTO_LOCK(node_lock_);
116
117 if (0 == socket_resource_)
118 return EBADF;
119
120 if (0 == remote_addr_)
121 return ENOTCONN;
122
123 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
124 int err = TCPSocket()->Write(socket_resource_,
125 static_cast<const char*>(buf),
126 capped_len,
127 PP_BlockUntilComplete());
128 if (err < 0)
129 return PPErrorToErrno(err);
130
131 *out_len = err;
132 return 0; 266 return 0;
133 } 267 }
134 268
135 Error MountNodeTCP::SendTo(const void* buf,
136 size_t len,
137 int flags,
138 const struct sockaddr* dest_addr,
139 socklen_t addrlen,
140 int* out_len) {
141 return Send(buf, len, flags, out_len);
142 }
143 269
144 } // namespace nacl_io 270 } // namespace nacl_io
145 271
146 #endif // PROVIDES_SOCKET_API 272 #endif // PROVIDES_SOCKET_API
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698