Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: native_client_sdk/src/libraries/nacl_io/mount_node_tcp.cc

Issue 23498015: [NaCl SDK] Support non blocking TCP/UDP (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Merge Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 5
6 #include "nacl_io/ossocket.h" 6 #include "nacl_io/ossocket.h"
7 #ifdef PROVIDES_SOCKET_API 7 #ifdef PROVIDES_SOCKET_API
8 8
9 #include <errno.h> 9 #include <errno.h>
10 #include <string.h> 10 #include <string.h>
11 #include <algorithm> 11 #include <algorithm>
12 12
13 #include "nacl_io/mount.h"
14 #include "nacl_io/mount_node_socket.h"
15 #include "nacl_io/mount_node_tcp.h" 13 #include "nacl_io/mount_node_tcp.h"
14 #include "nacl_io/mount_stream.h"
16 #include "nacl_io/pepper_interface.h" 15 #include "nacl_io/pepper_interface.h"
17 16
17 namespace {
18 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 }
21
18 namespace nacl_io { 22 namespace nacl_io {
19 23
20 MountNodeTCP::MountNodeTCP(Mount* mount) : MountNodeSocket(mount) {} 24 class TCPWork : public MountStream::Work {
25 public:
26 explicit TCPWork(const ScopedEventEmitterTCP& emitter)
27 : MountStream::Work(emitter->stream()->mount_stream()),
28 emitter_(emitter),
29 data_(NULL) {
30 }
31
32 ~TCPWork() {
33 delete[] data_;
34 }
35
36 TCPSocketInterface* TCPInterface() {
37 return mount()->ppapi()->GetTCPSocketInterface();
38 }
39
40 protected:
41 ScopedEventEmitterTCP emitter_;
42 char* data_;
43 };
21 44
22 45
23 TCPSocketInterface* MountNodeTCP::TCPSocket() { 46 class TCPSendWork : public TCPWork {
24 if (mount_->ppapi() == NULL) 47 public:
25 return NULL; 48 explicit TCPSendWork(const ScopedEventEmitterTCP& emitter)
49 : TCPWork(emitter) {}
26 50
27 return mount_->ppapi()->GetTCPSocketInterface(); 51 virtual bool Start(int32_t val) {
52 AUTO_LOCK(emitter_->GetLock());
53 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
54
55 // Does the stream exist, and can it send?
56 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_SEND))
57 return false;
58
59 // If not currently sending...
60 if (!stream->TestStreamFlags(SSF_SENDING)) {
61 size_t tx_data_avail = emitter_->out_fifo()->ReadAvailable();
62 int capped_len =
63 static_cast<int32_t>(std::min(tx_data_avail, kMaxPacketSize));
64
65 if (capped_len == 0)
66 return false;
67
68 data_ = new char[capped_len];
69 emitter_->ReadOut_Locked(data_, capped_len);
70
71 stream->SetStreamFlags(SSF_SENDING);
72 int err = TCPInterface()->Write(stream->socket_resource(),
73 data_,
74 capped_len,
75 mount()->GetRunCompletion(this));
76 if (err == PP_OK_COMPLETIONPENDING)
77 return true;
78
79 // Anything else, we should assume the socket has gone bad.
80 stream->SetError_Locked(err);
81 }
82 return false;
83 }
84
85 virtual void Run(int32_t length_error) {
86 AUTO_LOCK(emitter_->GetLock());
87 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
88
89 // If the stream is still there...
90 if (stream) {
91 // And we did send, then Q more work.
92 if (length_error >= 0) {
93 stream->ClearStreamFlags(SSF_SENDING);
94 stream->QueueOutput();
95 } else {
96 // Otherwise this socket has gone bad.
97 stream->SetError_Locked(length_error);
98 }
99 }
100 }
101 };
102
103 class TCPRecvWork : public TCPWork {
104 public:
105 explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
106 : TCPWork(emitter) {}
107
108 virtual bool Start(int32_t val) {
109 AUTO_LOCK(emitter_->GetLock());
110 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
111
112 // Does the stream exist, and can it recv?
113 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
114 return false;
115
116 // If we are not currently receiving
117 if (!stream->TestStreamFlags(SSF_RECVING)) {
118 size_t rx_space_avail = emitter_->in_fifo()->WriteAvailable();
119 int capped_len =
120 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
121
122 if (capped_len == 0)
123 return false;
124
125 stream->SetStreamFlags(SSF_RECVING);
126 data_ = new char[capped_len];
127 int err = TCPInterface()->Read(stream->socket_resource(),
128 data_,
129 capped_len,
130 mount()->GetRunCompletion(this));
131 if (err == PP_OK_COMPLETIONPENDING)
132 return true;
133
134 // Anything else, we should assume the socket has gone bad.
135 stream->SetError_Locked(err);
136 }
137 return false;
138 }
139
140 virtual void Run(int32_t length_error) {
141 AUTO_LOCK(emitter_->GetLock());
142 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
143
144 // If the stream is still there, see if we can queue more input
145 if (stream) {
146 if (length_error > 0) {
147 emitter_->WriteIn_Locked(data_, length_error);
148 stream->QueueInput();
149 } else {
150 stream->SetError_Locked(length_error);
151 }
152 }
153 }
154 };
155
156
157 MountNodeTCP::MountNodeTCP(Mount* mount)
158 : MountNodeSocket(mount),
159 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)) {
160 emitter_->AttachStream(this);
161 }
162
163 void MountNodeTCP::Destroy() {
164 emitter_->DetachStream();
165 MountNodeSocket::Destroy();
28 } 166 }
29 167
30 Error MountNodeTCP::Init(int flags) { 168 Error MountNodeTCP::Init(int flags) {
31 if (TCPSocket() == NULL) 169 if (TCPInterface() == NULL)
32 return EACCES; 170 return EACCES;
33 171
34 socket_resource_ = TCPSocket()->Create(mount_->ppapi()->GetInstance()); 172 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
35 if (0 == socket_resource_) 173 if (0 == socket_resource_)
36 return EACCES; 174 return EACCES;
37 175
38 return 0; 176 return 0;
39 } 177 }
40 178
179 EventEmitterTCP* MountNodeTCP::GetEventEmitter() {
180 return emitter_.get();
181 }
182
183 void MountNodeTCP::QueueInput() {
184 TCPRecvWork* work = new TCPRecvWork(emitter_);
185 mount_stream()->EnqueueWork(work);
186 }
187
188 void MountNodeTCP::QueueOutput() {
189 TCPSendWork* work = new TCPSendWork(emitter_);
190 mount_stream()->EnqueueWork(work);
191 }
192
193
194 // We can not bind a client socket with PPAPI. For now we ignore the
195 // bind but report the correct address later, just in case someone is
196 // binding without really caring what the address is (for example to
197 // select a more optimized interface/route.)
41 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 198 Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
42 AUTO_LOCK(node_lock_); 199 AUTO_LOCK(node_lock_);
43 200
44 if (0 == socket_resource_) 201 if (0 == socket_resource_)
45 return EBADF; 202 return EBADF;
46 203
47 /* Only bind once. */ 204 /* Only bind once. */
48 if (local_addr_ != 0) 205 if (local_addr_ != 0)
49 return EINVAL; 206 return EINVAL;
50 207
51 /* Lie, we won't know until we connect. */ 208 /* Lie, we won't know until we connect. */
52 return 0; 209 return 0;
53 } 210 }
54 211
55 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) { 212 Error MountNodeTCP::Connect(const struct sockaddr* addr, socklen_t len) {
56 AUTO_LOCK(node_lock_); 213 AUTO_LOCK(node_lock_);
57 214
58 if (0 == socket_resource_) 215 if (0 == socket_resource_)
59 return EBADF; 216 return EBADF;
60 217
61 if (remote_addr_ != 0) 218 if (remote_addr_ != 0)
62 return EISCONN; 219 return EISCONN;
63 220
64 remote_addr_ = SockAddrToResource(addr, len); 221 remote_addr_ = SockAddrToResource(addr, len);
65 if (0 == remote_addr_) 222 if (0 == remote_addr_)
66 return EINVAL; 223 return EINVAL;
67 224
68 int err = TCPSocket()->Connect(socket_resource_, 225 int err = TCPInterface()->Connect(socket_resource_,
69 remote_addr_, 226 remote_addr_,
70 PP_BlockUntilComplete()); 227 PP_BlockUntilComplete());
71 228
72 // If we fail, release the dest addr resource 229 // If we fail, release the dest addr resource
73 if (err != PP_OK) { 230 if (err != PP_OK) {
74 mount_->ppapi()->ReleaseResource(remote_addr_); 231 mount_->ppapi()->ReleaseResource(remote_addr_);
75 remote_addr_ = 0; 232 remote_addr_ = 0;
76 return PPErrorToErrno(err); 233 return PPErrorToErrno(err);
77 } 234 }
78 235
79 local_addr_ = TCPSocket()->GetLocalAddress(socket_resource_); 236 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
80 mount_->ppapi()->AddRefResource(local_addr_); 237 mount_->ppapi()->AddRefResource(local_addr_);
238
239 // Now that we are connected, we can start sending and receiving.
240 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
241
242 // Begin the input pump
243 QueueInput();
81 return 0; 244 return 0;
82 } 245 }
83 246
84 Error MountNodeTCP::Recv(void* buf, size_t len, int flags, int* out_len) {
85 AUTO_LOCK(node_lock_);
86 if (0 == socket_resource_)
87 return EBADF;
88 247
89 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER)); 248 Error MountNodeTCP::Recv_Locked(void* buf,
90 int err = TCPSocket()->Read(socket_resource_, 249 size_t len,
91 static_cast<char*>(buf), 250 PP_Resource* out_addr,
92 capped_len, 251 int* out_len) {
93 PP_BlockUntilComplete()); 252 *out_len = emitter_->in_fifo()->Read(buf, len);
94 if (err < 0) 253 *out_addr = remote_addr_;
95 return PPErrorToErrno(err);
96 254
97 *out_len = err; 255 // Ref the address copy we pass back.
256 mount_->ppapi()->AddRefResource(remote_addr_);
98 return 0; 257 return 0;
99 } 258 }
100 259
101 Error MountNodeTCP::RecvFrom(void* buf, 260 // TCP ignores dst addr passed to send_to, and always uses bound address
102 size_t len, 261 Error MountNodeTCP::Send_Locked(const void* buf,
103 int flags, 262 size_t len,
104 struct sockaddr* src_addr, 263 PP_Resource,
105 socklen_t* addrlen, 264 int* out_len) {
106 int* out_len) { 265 *out_len = emitter_->out_fifo()->Write(buf, len);
107 Error err = Recv(buf, len, flags, out_len);
108 if (err == 0)
109 GetPeerName(src_addr, addrlen);
110 return err;
111 }
112
113
114 Error MountNodeTCP::Send(const void* buf, size_t len, int flags, int* out_len) {
115 AUTO_LOCK(node_lock_);
116
117 if (0 == socket_resource_)
118 return EBADF;
119
120 if (0 == remote_addr_)
121 return ENOTCONN;
122
123 int capped_len = static_cast<int32_t>(std::min(len, MAX_SOCK_TRANSFER));
124 int err = TCPSocket()->Write(socket_resource_,
125 static_cast<const char*>(buf),
126 capped_len,
127 PP_BlockUntilComplete());
128 if (err < 0)
129 return PPErrorToErrno(err);
130
131 *out_len = err;
132 return 0; 266 return 0;
133 } 267 }
134 268
135 Error MountNodeTCP::SendTo(const void* buf,
136 size_t len,
137 int flags,
138 const struct sockaddr* dest_addr,
139 socklen_t addrlen,
140 int* out_len) {
141 return Send(buf, len, flags, out_len);
142 }
143 269
144 } // namespace nacl_io 270 } // namespace nacl_io
145 271
146 #endif // PROVIDES_SOCKET_API 272 #endif // PROVIDES_SOCKET_API
OLDNEW
« no previous file with comments | « native_client_sdk/src/libraries/nacl_io/mount_node_tcp.h ('k') | native_client_sdk/src/libraries/nacl_io/mount_node_tty.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698