OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "extensions/browser/api/socket/tcp_socket.h" | 5 #include "extensions/browser/api/socket/tcp_socket.h" |
6 | 6 |
| 7 #include <ctype.h> |
| 8 #include <algorithm> |
| 9 #include <iomanip> |
| 10 #include <sstream> |
| 11 |
7 #include "base/lazy_instance.h" | 12 #include "base/lazy_instance.h" |
8 #include "base/logging.h" | 13 #include "base/logging.h" |
9 #include "base/macros.h" | 14 #include "base/macros.h" |
10 #include "extensions/browser/api/api_resource.h" | 15 #include "extensions/browser/api/api_resource.h" |
11 #include "net/base/address_list.h" | 16 #include "net/base/address_list.h" |
12 #include "net/base/ip_endpoint.h" | 17 #include "net/base/ip_endpoint.h" |
13 #include "net/base/net_errors.h" | 18 #include "net/base/net_errors.h" |
14 #include "net/base/rand_callback.h" | 19 #include "net/base/rand_callback.h" |
15 #include "net/socket/tcp_client_socket.h" | 20 #include "net/socket/tcp_client_socket.h" |
16 | 21 |
(...skipping 18 matching lines...) Expand all Loading... |
35 ApiResourceManager<ResumableTCPServerSocket> > > g_server_factory = | 40 ApiResourceManager<ResumableTCPServerSocket> > > g_server_factory = |
36 LAZY_INSTANCE_INITIALIZER; | 41 LAZY_INSTANCE_INITIALIZER; |
37 | 42 |
38 // static | 43 // static |
39 template <> | 44 template <> |
40 BrowserContextKeyedAPIFactory<ApiResourceManager<ResumableTCPServerSocket> >* | 45 BrowserContextKeyedAPIFactory<ApiResourceManager<ResumableTCPServerSocket> >* |
41 ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() { | 46 ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() { |
42 return g_server_factory.Pointer(); | 47 return g_server_factory.Pointer(); |
43 } | 48 } |
44 | 49 |
| 50 SocketPauseBuffer::SocketPauseBuffer( |
| 51 const SocketPauseBuffer::DownstreamReadCallback& callback) |
| 52 : read_issued_(false), |
| 53 downstream_read_cb_(callback), |
| 54 downstream_read_buffer_(new net::GrowableIOBuffer), |
| 55 upstream_data_returned_(0), |
| 56 upstream_read_buffer_offset_(0), |
| 57 prior_error_code_(0), |
| 58 buffering_disabled_(false) {} |
| 59 |
| 60 SocketPauseBuffer::~SocketPauseBuffer() {} |
| 61 |
| 62 std::string SocketPauseBuffer::StatusDescription() { |
| 63 std::stringstream status; |
| 64 status << "\n"; |
| 65 status << static_cast<void*>(this); |
| 66 status << " < " << upstream_data_returned_ << " | " |
| 67 << downstream_read_buffer_->offset() << " | " |
| 68 << downstream_read_buffer_->capacity() << ">: " |
| 69 << ", buffered: " << BufferedDataCount() << ", " |
| 70 << (read_issued_ ? "READING" : "not reading") << ", cb: " |
| 71 << (upstream_read_callback_.is_null() ? "null" : "not null"); |
| 72 return status.str(); |
| 73 } |
| 74 |
| 75 int SocketPauseBuffer::BufferedDataCount() const { |
| 76 return downstream_read_buffer_->offset() - upstream_data_returned_; |
| 77 } |
| 78 |
| 79 void SocketPauseBuffer::CreditIncomingData(int count) { |
| 80 DCHECK_GE(count, 0); |
| 81 downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() + |
| 82 count); |
| 83 } |
| 84 |
| 85 void SocketPauseBuffer::ReturnDataInBuffer(net::IOBuffer* dest, int count) { |
| 86 DCHECK_GE(count, 0); |
| 87 memcpy(dest->data(), |
| 88 downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_, |
| 89 count); |
| 90 upstream_data_returned_ += count; |
| 91 } |
| 92 |
| 93 void SocketPauseBuffer::FreeReturnedBufferSpace() { |
| 94 DCHECK(!read_issued_); |
| 95 const int amount_to_free = upstream_data_returned_; |
| 96 |
| 97 memmove(downstream_read_buffer_->StartOfBuffer(), |
| 98 downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_, |
| 99 amount_to_free); |
| 100 downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() - |
| 101 amount_to_free); |
| 102 upstream_data_returned_ -= amount_to_free; |
| 103 } |
| 104 |
| 105 // Called whenever the client's callback (either upstream_read_callback_ or |
| 106 // the |callback| argument to Read()) is invoked. Clears out state relevant |
| 107 // to the Read. |
| 108 void SocketPauseBuffer::ResetUpstreamState() { |
| 109 upstream_read_buffer_offset_ = 0; |
| 110 upstream_read_buffer_ = NULL; |
| 111 upstream_read_callback_.Reset(); |
| 112 } |
| 113 |
| 114 void SocketPauseBuffer::InvokeCallback(int result) { |
| 115 DCHECK(!upstream_read_callback_.is_null()); |
| 116 net::CompletionCallback cb = upstream_read_callback_; |
| 117 ResetUpstreamState(); |
| 118 cb.Run(result); |
| 119 } |
| 120 |
| 121 void SocketPauseBuffer::InsureBufferCanHold(int num_bytes) { |
| 122 if (num_bytes > (downstream_read_buffer_->capacity() - |
| 123 downstream_read_buffer_->offset())) { |
| 124 downstream_read_buffer_->SetCapacity(num_bytes + |
| 125 downstream_read_buffer_->offset()); |
| 126 } |
| 127 } |
| 128 |
| 129 bool SocketPauseBuffer::Pause() { |
| 130 if (buffering_disabled_) { |
| 131 return false; |
| 132 } else if (!upstream_read_callback_.is_null()) { |
| 133 InvokeCallback(net::ERR_ABORTED); |
| 134 } |
| 135 return true; |
| 136 } |
| 137 |
| 138 void SocketPauseBuffer::DisableBuffering() { |
| 139 buffering_disabled_ = true; |
| 140 } |
| 141 |
| 142 int SocketPauseBuffer::Read(net::IOBuffer* buffer, |
| 143 int buf_len, |
| 144 const net::CompletionCallback& callback) { |
| 145 DVLOG(1) << "Read(" << buf_len << ") START " << StatusDescription(); |
| 146 |
| 147 if (buffering_disabled_ && BufferedDataCount() == 0) { |
| 148 return downstream_read_cb_.Run(buffer, buf_len, callback); |
| 149 } |
| 150 |
| 151 // Can't buffer data and have a pending downstream Read() simultaneously. |
| 152 DCHECK(BufferedDataCount() == 0 || !read_issued_); |
| 153 |
| 154 // Only one upstream Read() is allowed at a time. |
| 155 if (!upstream_read_callback_.is_null()) { |
| 156 DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT " |
| 157 << StatusDescription(); |
| 158 DCHECK(read_issued_); |
| 159 return net::ERR_INVALID_ARGUMENT; // 'this' is the invalid argument. |
| 160 } |
| 161 |
| 162 if (buf_len < 0) { |
| 163 DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT " |
| 164 << StatusDescription(); |
| 165 return net::ERR_INVALID_ARGUMENT; |
| 166 } |
| 167 |
| 168 if (BufferedDataCount() > 0) { |
| 169 const int num_returned_bytes = std::min(buf_len, BufferedDataCount()); |
| 170 ReturnDataInBuffer(buffer, num_returned_bytes); |
| 171 // Release our internal buffer once we're no longer using it. |
| 172 if (buffering_disabled_ && BufferedDataCount() == 0) { |
| 173 downstream_read_buffer_ = nullptr; |
| 174 } |
| 175 return num_returned_bytes; |
| 176 } else { |
| 177 DCHECK(!buffering_disabled_); |
| 178 upstream_read_callback_ = callback; |
| 179 upstream_read_request_size_ = buf_len; |
| 180 upstream_read_buffer_ = buffer; |
| 181 if (!read_issued_) { |
| 182 // Issue a new downstream read for this request. |
| 183 FreeReturnedBufferSpace(); |
| 184 InsureBufferCanHold(buf_len); |
| 185 read_issued_ = true; |
| 186 int result = downstream_read_cb_.Run( |
| 187 downstream_read_buffer_.get(), buf_len, |
| 188 base::Bind(&SocketPauseBuffer::ReadComplete, base::Unretained(this))); |
| 189 if (result > 0) { |
| 190 read_issued_ = false; |
| 191 CreditIncomingData(result); |
| 192 ReturnDataInBuffer(buffer, result); |
| 193 ResetUpstreamState(); |
| 194 } |
| 195 return result; |
| 196 } else { |
| 197 // We already have a downstream read pending, and now it's plumbed to |
| 198 // invoke the callback argument. |
| 199 return net::ERR_IO_PENDING; |
| 200 } |
| 201 } |
| 202 } |
| 203 |
| 204 void SocketPauseBuffer::ReadComplete(int count) { |
| 205 read_issued_ = false; |
| 206 if (count > 0) { |
| 207 CreditIncomingData(count); |
| 208 } |
| 209 if (upstream_read_callback_.is_null()) { |
| 210 prior_error_code_ = std::min(0, count); |
| 211 } else { |
| 212 int num_returned_bytes = std::min(count, upstream_read_request_size_); |
| 213 ReturnDataInBuffer(upstream_read_buffer_.get(), num_returned_bytes); |
| 214 InvokeCallback(num_returned_bytes); |
| 215 } |
| 216 } |
| 217 |
| 218 BufferingStreamSocket::BufferingStreamSocket( |
| 219 scoped_ptr<net::StreamSocket> socket) |
| 220 : Passthrough<net::StreamSocket>(socket.Pass()), |
| 221 pause_buffer_(base::Bind(&net::StreamSocket::Read, |
| 222 base::Unretained(base_socket()))) {} |
| 223 |
| 224 BufferingStreamSocket::~BufferingStreamSocket() {} |
| 225 |
| 226 bool BufferingStreamSocket::Pause() { |
| 227 return pause_buffer_.Pause(); |
| 228 } |
| 229 |
| 230 void BufferingStreamSocket::DisableBuffering() { |
| 231 pause_buffer_.DisableBuffering(); |
| 232 } |
| 233 |
| 234 int BufferingStreamSocket::Read(net::IOBuffer* buffer, |
| 235 int buf_len, |
| 236 const net::CompletionCallback& callback) { |
| 237 return pause_buffer_.Read(buffer, buf_len, callback); |
| 238 } |
| 239 |
| 240 BufferingTCPClientSocket::BufferingTCPClientSocket( |
| 241 scoped_ptr<net::TCPClientSocket> socket) |
| 242 : Passthrough<net::TCPClientSocket>(socket.Pass()), |
| 243 pause_buffer_(base::Bind(&net::TCPClientSocket::Read, |
| 244 base::Unretained(base_socket()))) {} |
| 245 |
| 246 BufferingTCPClientSocket::~BufferingTCPClientSocket() {} |
| 247 |
| 248 bool BufferingTCPClientSocket::Pause() { |
| 249 return pause_buffer_.Pause(); |
| 250 } |
| 251 |
| 252 void BufferingTCPClientSocket::DisableBuffering() { |
| 253 pause_buffer_.DisableBuffering(); |
| 254 } |
| 255 |
| 256 int BufferingTCPClientSocket::Read(net::IOBuffer* buffer, |
| 257 int buf_len, |
| 258 const net::CompletionCallback& callback) { |
| 259 return pause_buffer_.Read(buffer, buf_len, callback); |
| 260 } |
| 261 |
| 262 bool BufferingTCPClientSocket::SetKeepAlive(bool enable, int delay) { |
| 263 return base_socket()->SetKeepAlive(enable, delay); |
| 264 } |
| 265 |
| 266 bool BufferingTCPClientSocket::SetNoDelay(bool no_delay) { |
| 267 return base_socket()->SetNoDelay(no_delay); |
| 268 } |
| 269 |
45 TCPSocket::TCPSocket(const std::string& owner_extension_id) | 270 TCPSocket::TCPSocket(const std::string& owner_extension_id) |
46 : Socket(owner_extension_id), socket_mode_(UNKNOWN) {} | 271 : Socket(owner_extension_id), socket_mode_(UNKNOWN) {} |
47 | 272 |
48 TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket, | 273 TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket, |
49 const std::string& owner_extension_id, | 274 const std::string& owner_extension_id, |
50 bool is_connected) | 275 bool is_connected) |
51 : Socket(owner_extension_id), | 276 : Socket(owner_extension_id), |
52 socket_(tcp_client_socket), | |
53 socket_mode_(CLIENT) { | 277 socket_mode_(CLIENT) { |
| 278 scoped_ptr<net::TCPClientSocket> p(tcp_client_socket); |
| 279 socket_.reset(new BufferingTCPClientSocket(p.Pass())); |
54 this->is_connected_ = is_connected; | 280 this->is_connected_ = is_connected; |
55 } | 281 } |
56 | 282 |
57 TCPSocket::TCPSocket(net::TCPServerSocket* tcp_server_socket, | 283 TCPSocket::TCPSocket(net::TCPServerSocket* tcp_server_socket, |
58 const std::string& owner_extension_id) | 284 const std::string& owner_extension_id) |
59 : Socket(owner_extension_id), | 285 : Socket(owner_extension_id), |
60 server_socket_(tcp_server_socket), | 286 server_socket_(tcp_server_socket), |
61 socket_mode_(SERVER) {} | 287 socket_mode_(SERVER) {} |
62 | 288 |
63 // static | 289 // static |
(...skipping 23 matching lines...) Expand all Loading... |
87 } | 313 } |
88 DCHECK(!server_socket_.get()); | 314 DCHECK(!server_socket_.get()); |
89 socket_mode_ = CLIENT; | 315 socket_mode_ = CLIENT; |
90 connect_callback_ = callback; | 316 connect_callback_ = callback; |
91 | 317 |
92 int result = net::ERR_CONNECTION_FAILED; | 318 int result = net::ERR_CONNECTION_FAILED; |
93 do { | 319 do { |
94 if (is_connected_) | 320 if (is_connected_) |
95 break; | 321 break; |
96 | 322 |
97 socket_.reset( | 323 scoped_ptr<net::TCPClientSocket> internal_sock( |
98 new net::TCPClientSocket(address, NULL, net::NetLog::Source())); | 324 new net::TCPClientSocket(address, NULL, net::NetLog::Source())); |
99 | 325 |
| 326 socket_.reset(new BufferingTCPClientSocket(internal_sock.Pass())); |
| 327 |
100 connect_callback_ = callback; | 328 connect_callback_ = callback; |
101 result = socket_->Connect( | 329 result = socket_->Connect( |
102 base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this))); | 330 base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this))); |
103 } while (false); | 331 } while (false); |
104 | 332 |
105 if (result != net::ERR_IO_PENDING) | 333 if (result != net::ERR_IO_PENDING) |
106 OnConnectComplete(result); | 334 OnConnectComplete(result); |
107 } | 335 } |
108 | 336 |
109 void TCPSocket::Disconnect() { | 337 void TCPSocket::Disconnect() { |
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
309 is_connected_ = false; | 537 is_connected_ = false; |
310 | 538 |
311 connect_callback_.Reset(); | 539 connect_callback_.Reset(); |
312 read_callback_.Reset(); | 540 read_callback_.Reset(); |
313 accept_callback_.Reset(); | 541 accept_callback_.Reset(); |
314 | 542 |
315 DCHECK(socket_.get()) << "Called on null client socket."; | 543 DCHECK(socket_.get()) << "Called on null client socket."; |
316 ignore_result(socket_.release()); | 544 ignore_result(socket_.release()); |
317 } | 545 } |
318 | 546 |
319 net::TCPClientSocket* TCPSocket::ClientStream() { | 547 BufferingTCPClientSocket* TCPSocket::ClientStream() { |
320 if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP) | 548 if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP) |
321 return NULL; | 549 return NULL; |
322 return socket_.get(); | 550 return socket_.get(); |
323 } | 551 } |
324 | 552 |
325 bool TCPSocket::HasPendingRead() const { | 553 bool TCPSocket::HasPendingRead() const { |
326 return !read_callback_.is_null(); | 554 return !read_callback_.is_null(); |
327 } | 555 } |
328 | 556 |
329 ResumableTCPSocket::ResumableTCPSocket(const std::string& owner_extension_id) | 557 ResumableTCPSocket::ResumableTCPSocket(const std::string& owner_extension_id) |
330 : TCPSocket(owner_extension_id), | 558 : TCPSocket(owner_extension_id), |
331 persistent_(false), | 559 persistent_(false), |
332 buffer_size_(0), | 560 buffer_size_(0), |
333 paused_(false) {} | 561 paused_(false) {} |
334 | 562 |
335 ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket, | 563 ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket, |
336 const std::string& owner_extension_id, | 564 const std::string& owner_extension_id, |
337 bool is_connected) | 565 bool is_connected) |
338 : TCPSocket(tcp_client_socket, owner_extension_id, is_connected), | 566 : TCPSocket(tcp_client_socket, owner_extension_id, is_connected), |
339 persistent_(false), | 567 persistent_(false), |
340 buffer_size_(0), | 568 buffer_size_(0), |
341 paused_(false) {} | 569 paused_(false) {} |
342 | 570 |
| 571 ResumableTCPSocket::~ResumableTCPSocket() {} |
| 572 |
343 bool ResumableTCPSocket::IsPersistent() const { return persistent(); } | 573 bool ResumableTCPSocket::IsPersistent() const { return persistent(); } |
344 | 574 |
| 575 void ResumableTCPSocket::ApiPauseComplete() { |
| 576 if (!pause_callback_.is_null()) { |
| 577 pause_callback_.Run(); |
| 578 pause_callback_.Reset(); |
| 579 } |
| 580 } |
| 581 |
345 ResumableTCPServerSocket::ResumableTCPServerSocket( | 582 ResumableTCPServerSocket::ResumableTCPServerSocket( |
346 const std::string& owner_extension_id) | 583 const std::string& owner_extension_id) |
347 : TCPSocket(owner_extension_id), persistent_(false), paused_(false) {} | 584 : TCPSocket(owner_extension_id), persistent_(false), paused_(false) {} |
348 | 585 |
349 bool ResumableTCPServerSocket::IsPersistent() const { return persistent(); } | 586 bool ResumableTCPServerSocket::IsPersistent() const { return persistent(); } |
350 | 587 |
351 } // namespace extensions | 588 } // namespace extensions |
OLD | NEW |