Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(368)

Side by Side Diff: extensions/browser/api/socket/tcp_socket.cc

Issue 494573002: A change for the setPause() api in chrome.sockets.tcp: Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Cosmetics and commentary. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "extensions/browser/api/socket/tcp_socket.h" 5 #include "extensions/browser/api/socket/tcp_socket.h"
6 6
7 #include <ctype.h>
8 #include <algorithm>
9 #include <iomanip>
10 #include <sstream>
11
7 #include "base/lazy_instance.h" 12 #include "base/lazy_instance.h"
8 #include "base/logging.h" 13 #include "base/logging.h"
9 #include "base/macros.h" 14 #include "base/macros.h"
10 #include "extensions/browser/api/api_resource.h" 15 #include "extensions/browser/api/api_resource.h"
11 #include "net/base/address_list.h" 16 #include "net/base/address_list.h"
12 #include "net/base/ip_endpoint.h" 17 #include "net/base/ip_endpoint.h"
13 #include "net/base/net_errors.h" 18 #include "net/base/net_errors.h"
14 #include "net/base/rand_callback.h" 19 #include "net/base/rand_callback.h"
15 #include "net/socket/tcp_client_socket.h" 20 #include "net/socket/tcp_client_socket.h"
16 21
(...skipping 18 matching lines...) Expand all
35 ApiResourceManager<ResumableTCPServerSocket> > > g_server_factory = 40 ApiResourceManager<ResumableTCPServerSocket> > > g_server_factory =
36 LAZY_INSTANCE_INITIALIZER; 41 LAZY_INSTANCE_INITIALIZER;
37 42
38 // static 43 // static
39 template <> 44 template <>
40 BrowserContextKeyedAPIFactory<ApiResourceManager<ResumableTCPServerSocket> >* 45 BrowserContextKeyedAPIFactory<ApiResourceManager<ResumableTCPServerSocket> >*
41 ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() { 46 ApiResourceManager<ResumableTCPServerSocket>::GetFactoryInstance() {
42 return g_server_factory.Pointer(); 47 return g_server_factory.Pointer();
43 } 48 }
44 49
50 SocketPauseBuffer::SocketPauseBuffer(
51 const SocketPauseBuffer::DownstreamReadCallback& callback)
52 : read_issued_(false),
53 downstream_read_cb_(callback),
54 downstream_read_buffer_(new net::GrowableIOBuffer),
55 upstream_data_returned_(0),
56 upstream_read_buffer_offset_(0),
57 prior_error_code_(0),
58 buffering_disabled_(false) {}
59
60 SocketPauseBuffer::~SocketPauseBuffer() {}
61
62 std::string SocketPauseBuffer::StatusDescription() {
63 std::stringstream status;
64 status << "\n";
65 status << static_cast<void*>(this);
66 status << " < " << upstream_data_returned_ << " | "
67 << downstream_read_buffer_->offset() << " | "
68 << downstream_read_buffer_->capacity() << ">: "
69 << ", buffered: " << BufferedDataCount() << ", "
70 << (read_issued_ ? "READING" : "not reading") << ", cb: "
71 << (upstream_read_callback_.is_null() ? "null" : "not null");
72 return status.str();
73 }
74
75 int SocketPauseBuffer::BufferedDataCount() const {
76 return downstream_read_buffer_->offset() - upstream_data_returned_;
77 }
78
79 void SocketPauseBuffer::CreditIncomingData(int count) {
80 DCHECK_GE(count, 0);
81 downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() +
82 count);
83 }
84
85 void SocketPauseBuffer::ReturnDataInBuffer(net::IOBuffer* dest, int count) {
86 DCHECK_GE(count, 0);
87 memcpy(dest->data(),
88 downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
89 count);
90 upstream_data_returned_ += count;
91 }
92
93 void SocketPauseBuffer::FreeReturnedBufferSpace() {
94 DCHECK(!read_issued_);
95 const int amount_to_free = upstream_data_returned_;
96
97 memmove(downstream_read_buffer_->StartOfBuffer(),
98 downstream_read_buffer_->StartOfBuffer() + upstream_data_returned_,
99 amount_to_free);
100 downstream_read_buffer_->set_offset(downstream_read_buffer_->offset() -
101 amount_to_free);
102 upstream_data_returned_ -= amount_to_free;
103 }
104
105 // Called whenever the client's callback (either upstream_read_callback_ or
106 // the |callback| argument to Read()) is invoked. Clears out state relevant
107 // to the Read.
108 void SocketPauseBuffer::ResetUpstreamState() {
109 upstream_read_buffer_offset_ = 0;
110 upstream_read_buffer_ = NULL;
111 upstream_read_callback_.Reset();
112 }
113
114 void SocketPauseBuffer::InvokeCallback(int result) {
115 DCHECK(!upstream_read_callback_.is_null());
116 net::CompletionCallback cb = upstream_read_callback_;
117 ResetUpstreamState();
118 cb.Run(result);
119 }
120
121 void SocketPauseBuffer::InsureBufferCanHold(int num_bytes) {
122 if (num_bytes > (downstream_read_buffer_->capacity() -
123 downstream_read_buffer_->offset())) {
124 downstream_read_buffer_->SetCapacity(num_bytes +
125 downstream_read_buffer_->offset());
126 }
127 }
128
129 bool SocketPauseBuffer::Pause() {
130 if (buffering_disabled_) {
131 return false;
132 } else if (!upstream_read_callback_.is_null()) {
133 InvokeCallback(net::ERR_ABORTED);
134 }
135 return true;
136 }
137
138 void SocketPauseBuffer::DisableBuffering() {
139 buffering_disabled_ = true;
140 }
141
142 int SocketPauseBuffer::Read(net::IOBuffer* buffer,
143 int buf_len,
144 const net::CompletionCallback& callback) {
145 DVLOG(1) << "Read(" << buf_len << ") START " << StatusDescription();
146
147 if (buffering_disabled_ && BufferedDataCount() == 0) {
148 return downstream_read_cb_.Run(buffer, buf_len, callback);
149 }
150
151 // Can't buffer data and have a pending downstream Read() simultaneously.
152 DCHECK(BufferedDataCount() == 0 || !read_issued_);
153
154 // Only one upstream Read() is allowed at a time.
155 if (!upstream_read_callback_.is_null()) {
156 DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
157 << StatusDescription();
158 DCHECK(read_issued_);
159 return net::ERR_INVALID_ARGUMENT; // 'this' is the invalid argument.
160 }
161
162 if (buf_len < 0) {
163 DVLOG(1) << "Read(" << buf_len << "): - Returning ERR_INVALID_ARGUMENT "
164 << StatusDescription();
165 return net::ERR_INVALID_ARGUMENT;
166 }
167
168 if (BufferedDataCount() > 0) {
169 const int num_returned_bytes = std::min(buf_len, BufferedDataCount());
170 ReturnDataInBuffer(buffer, num_returned_bytes);
171 // Release our internal buffer once we're no longer using it.
172 if (buffering_disabled_ && BufferedDataCount() == 0) {
173 downstream_read_buffer_ = nullptr;
174 }
175 return num_returned_bytes;
176 } else {
177 DCHECK(!buffering_disabled_);
178 upstream_read_callback_ = callback;
179 upstream_read_request_size_ = buf_len;
180 upstream_read_buffer_ = buffer;
181 if (!read_issued_) {
182 // Issue a new downstream read for this request.
183 FreeReturnedBufferSpace();
184 InsureBufferCanHold(buf_len);
185 read_issued_ = true;
186 int result = downstream_read_cb_.Run(
187 downstream_read_buffer_.get(), buf_len,
188 base::Bind(&SocketPauseBuffer::ReadComplete, base::Unretained(this)));
189 if (result > 0) {
190 read_issued_ = false;
191 CreditIncomingData(result);
192 ReturnDataInBuffer(buffer, result);
193 ResetUpstreamState();
194 }
195 return result;
196 } else {
197 // We already have a downstream read pending, and now it's plumbed to
198 // invoke the callback argument.
199 return net::ERR_IO_PENDING;
200 }
201 }
202 }
203
204 void SocketPauseBuffer::ReadComplete(int count) {
205 read_issued_ = false;
206 if (count > 0) {
207 CreditIncomingData(count);
208 }
209 if (upstream_read_callback_.is_null()) {
210 prior_error_code_ = std::min(0, count);
211 } else {
212 int num_returned_bytes = std::min(count, upstream_read_request_size_);
213 ReturnDataInBuffer(upstream_read_buffer_.get(), num_returned_bytes);
214 InvokeCallback(num_returned_bytes);
215 }
216 }
217
218 BufferingStreamSocket::BufferingStreamSocket(
219 scoped_ptr<net::StreamSocket> socket)
220 : Passthrough<net::StreamSocket>(socket.Pass()),
221 pause_buffer_(base::Bind(&net::StreamSocket::Read,
222 base::Unretained(base_socket()))) {}
223
224 BufferingStreamSocket::~BufferingStreamSocket() {}
225
226 bool BufferingStreamSocket::Pause() {
227 return pause_buffer_.Pause();
228 }
229
230 void BufferingStreamSocket::DisableBuffering() {
231 pause_buffer_.DisableBuffering();
232 }
233
234 int BufferingStreamSocket::Read(net::IOBuffer* buffer,
235 int buf_len,
236 const net::CompletionCallback& callback) {
237 return pause_buffer_.Read(buffer, buf_len, callback);
238 }
239
240 BufferingTCPClientSocket::BufferingTCPClientSocket(
241 scoped_ptr<net::TCPClientSocket> socket)
242 : Passthrough<net::TCPClientSocket>(socket.Pass()),
243 pause_buffer_(base::Bind(&net::TCPClientSocket::Read,
244 base::Unretained(base_socket()))) {}
245
246 BufferingTCPClientSocket::~BufferingTCPClientSocket() {}
247
248 bool BufferingTCPClientSocket::Pause() {
249 return pause_buffer_.Pause();
250 }
251
252 void BufferingTCPClientSocket::DisableBuffering() {
253 pause_buffer_.DisableBuffering();
254 }
255
256 int BufferingTCPClientSocket::Read(net::IOBuffer* buffer,
257 int buf_len,
258 const net::CompletionCallback& callback) {
259 return pause_buffer_.Read(buffer, buf_len, callback);
260 }
261
262 bool BufferingTCPClientSocket::SetKeepAlive(bool enable, int delay) {
263 return base_socket()->SetKeepAlive(enable, delay);
264 }
265
266 bool BufferingTCPClientSocket::SetNoDelay(bool no_delay) {
267 return base_socket()->SetNoDelay(no_delay);
268 }
269
45 TCPSocket::TCPSocket(const std::string& owner_extension_id) 270 TCPSocket::TCPSocket(const std::string& owner_extension_id)
46 : Socket(owner_extension_id), socket_mode_(UNKNOWN) {} 271 : Socket(owner_extension_id), socket_mode_(UNKNOWN) {}
47 272
48 TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket, 273 TCPSocket::TCPSocket(net::TCPClientSocket* tcp_client_socket,
49 const std::string& owner_extension_id, 274 const std::string& owner_extension_id,
50 bool is_connected) 275 bool is_connected)
51 : Socket(owner_extension_id), 276 : Socket(owner_extension_id),
52 socket_(tcp_client_socket),
53 socket_mode_(CLIENT) { 277 socket_mode_(CLIENT) {
278 scoped_ptr<net::TCPClientSocket> p(tcp_client_socket);
279 socket_.reset(new BufferingTCPClientSocket(p.Pass()));
54 this->is_connected_ = is_connected; 280 this->is_connected_ = is_connected;
55 } 281 }
56 282
57 TCPSocket::TCPSocket(net::TCPServerSocket* tcp_server_socket, 283 TCPSocket::TCPSocket(net::TCPServerSocket* tcp_server_socket,
58 const std::string& owner_extension_id) 284 const std::string& owner_extension_id)
59 : Socket(owner_extension_id), 285 : Socket(owner_extension_id),
60 server_socket_(tcp_server_socket), 286 server_socket_(tcp_server_socket),
61 socket_mode_(SERVER) {} 287 socket_mode_(SERVER) {}
62 288
63 // static 289 // static
(...skipping 23 matching lines...) Expand all
87 } 313 }
88 DCHECK(!server_socket_.get()); 314 DCHECK(!server_socket_.get());
89 socket_mode_ = CLIENT; 315 socket_mode_ = CLIENT;
90 connect_callback_ = callback; 316 connect_callback_ = callback;
91 317
92 int result = net::ERR_CONNECTION_FAILED; 318 int result = net::ERR_CONNECTION_FAILED;
93 do { 319 do {
94 if (is_connected_) 320 if (is_connected_)
95 break; 321 break;
96 322
97 socket_.reset( 323 scoped_ptr<net::TCPClientSocket> internal_sock(
98 new net::TCPClientSocket(address, NULL, net::NetLog::Source())); 324 new net::TCPClientSocket(address, NULL, net::NetLog::Source()));
99 325
326 socket_.reset(new BufferingTCPClientSocket(internal_sock.Pass()));
327
100 connect_callback_ = callback; 328 connect_callback_ = callback;
101 result = socket_->Connect( 329 result = socket_->Connect(
102 base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this))); 330 base::Bind(&TCPSocket::OnConnectComplete, base::Unretained(this)));
103 } while (false); 331 } while (false);
104 332
105 if (result != net::ERR_IO_PENDING) 333 if (result != net::ERR_IO_PENDING)
106 OnConnectComplete(result); 334 OnConnectComplete(result);
107 } 335 }
108 336
109 void TCPSocket::Disconnect() { 337 void TCPSocket::Disconnect() {
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
309 is_connected_ = false; 537 is_connected_ = false;
310 538
311 connect_callback_.Reset(); 539 connect_callback_.Reset();
312 read_callback_.Reset(); 540 read_callback_.Reset();
313 accept_callback_.Reset(); 541 accept_callback_.Reset();
314 542
315 DCHECK(socket_.get()) << "Called on null client socket."; 543 DCHECK(socket_.get()) << "Called on null client socket.";
316 ignore_result(socket_.release()); 544 ignore_result(socket_.release());
317 } 545 }
318 546
319 net::TCPClientSocket* TCPSocket::ClientStream() { 547 BufferingTCPClientSocket* TCPSocket::ClientStream() {
320 if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP) 548 if (socket_mode_ != CLIENT || GetSocketType() != TYPE_TCP)
321 return NULL; 549 return NULL;
322 return socket_.get(); 550 return socket_.get();
323 } 551 }
324 552
325 bool TCPSocket::HasPendingRead() const { 553 bool TCPSocket::HasPendingRead() const {
326 return !read_callback_.is_null(); 554 return !read_callback_.is_null();
327 } 555 }
328 556
329 ResumableTCPSocket::ResumableTCPSocket(const std::string& owner_extension_id) 557 ResumableTCPSocket::ResumableTCPSocket(const std::string& owner_extension_id)
330 : TCPSocket(owner_extension_id), 558 : TCPSocket(owner_extension_id),
331 persistent_(false), 559 persistent_(false),
332 buffer_size_(0), 560 buffer_size_(0),
333 paused_(false) {} 561 paused_(false) {}
334 562
335 ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket, 563 ResumableTCPSocket::ResumableTCPSocket(net::TCPClientSocket* tcp_client_socket,
336 const std::string& owner_extension_id, 564 const std::string& owner_extension_id,
337 bool is_connected) 565 bool is_connected)
338 : TCPSocket(tcp_client_socket, owner_extension_id, is_connected), 566 : TCPSocket(tcp_client_socket, owner_extension_id, is_connected),
339 persistent_(false), 567 persistent_(false),
340 buffer_size_(0), 568 buffer_size_(0),
341 paused_(false) {} 569 paused_(false) {}
342 570
571 ResumableTCPSocket::~ResumableTCPSocket() {}
572
343 bool ResumableTCPSocket::IsPersistent() const { return persistent(); } 573 bool ResumableTCPSocket::IsPersistent() const { return persistent(); }
344 574
575 void ResumableTCPSocket::ApiPauseComplete() {
576 if (!pause_callback_.is_null()) {
577 pause_callback_.Run();
578 pause_callback_.Reset();
579 }
580 }
581
345 ResumableTCPServerSocket::ResumableTCPServerSocket( 582 ResumableTCPServerSocket::ResumableTCPServerSocket(
346 const std::string& owner_extension_id) 583 const std::string& owner_extension_id)
347 : TCPSocket(owner_extension_id), persistent_(false), paused_(false) {} 584 : TCPSocket(owner_extension_id), persistent_(false), paused_(false) {}
348 585
349 bool ResumableTCPServerSocket::IsPersistent() const { return persistent(); } 586 bool ResumableTCPServerSocket::IsPersistent() const { return persistent(); }
350 587
351 } // namespace extensions 588 } // namespace extensions
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698