Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(131)

Side by Side Diff: net/socket_stream/socket_stream.cc

Issue 243077: Add net/socket_stream. (Closed)
Patch Set: Rename SocketStreamJob to SocketStream Created 11 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 //
5 // TODO(ukai): code is similar with http_network_transaction.cc. We should
6 // think about ways to share code, if possible.
7
8 #include "net/socket_stream/socket_stream.h"
9
10 #include <string>
11
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop.h"
15 #include "base/string_util.h"
16 #include "net/base/host_resolver.h"
17 #include "net/base/io_buffer.h"
18 #include "net/base/net_errors.h"
19 #include "net/base/net_util.h"
20 #include "net/http/http_response_headers.h"
21 #include "net/http/http_util.h"
22 #include "net/socket/client_socket_factory.h"
23 #include "net/socket/ssl_client_socket.h"
24 #include "net/socket/socks5_client_socket.h"
25 #include "net/socket/socks_client_socket.h"
26 #include "net/socket/tcp_client_socket.h"
27 #include "net/url_request/url_request.h"
28
29 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
30 static const int kReadBufferSize = 4096;
31
32 namespace net {
33
34 void SocketStream::ResponseHeaders::Realloc(size_t new_size) {
35 headers_.reset(static_cast<char*>(realloc(headers_.release(), new_size)));
36 }
37
38 SocketStream::SocketStream(const GURL& url, Delegate* delegate)
39 : url_(url),
40 delegate_(delegate),
41 max_pending_send_allowed_(kMaxPendingSendAllowed),
42 next_state_(STATE_NONE),
43 host_resolver_(CreateSystemHostResolver()),
eroman 2009/11/09 22:22:11 The HostResolver should be passed in as a dependen
eroman 2009/11/09 22:26:28 Also the HostResolver created in the chrome code i
44 factory_(ClientSocketFactory::GetDefaultFactory()),
45 proxy_mode_(kDirectConnection),
46 pac_request_(NULL),
47 ALLOW_THIS_IN_INITIALIZER_LIST(
48 io_callback_(this, &SocketStream::OnIOCompleted)),
49 ALLOW_THIS_IN_INITIALIZER_LIST(
50 read_callback_(this, &SocketStream::OnReadCompleted)),
51 ALLOW_THIS_IN_INITIALIZER_LIST(
52 write_callback_(this, &SocketStream::OnWriteCompleted)),
53 read_buf_(NULL),
54 write_buf_(NULL),
55 current_write_buf_(NULL),
56 write_buf_offset_(0),
57 write_buf_size_(0) {
58 DCHECK(MessageLoop::current()) <<
59 "The current MessageLoop must exist";
60 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
61 "The current MessageLoop must be TYPE_IO";
62 DCHECK(delegate_);
63 }
64
65 SocketStream::~SocketStream() {
66 DCHECK(!delegate_);
67 }
68
69 SocketStream::UserData* SocketStream::GetUserData(
70 const void* key) const {
71 UserDataMap::const_iterator found = user_data_.find(key);
72 if (found != user_data_.end())
73 return found->second.get();
74 return NULL;
75 }
76
77 void SocketStream::SetUserData(const void* key, UserData* data) {
78 user_data_[key] = linked_ptr<UserData>(data);
79 }
80
81 void SocketStream::set_context(URLRequestContext* context) {
82 context_ = context;
83 }
84
85 void SocketStream::Connect() {
86 DCHECK(MessageLoop::current()) <<
87 "The current MessageLoop must exist";
88 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
89 "The current MessageLoop must be TYPE_IO";
90 ssl_config_service()->GetSSLConfig(&ssl_config_);
91
92 AddRef(); // Released in Finish()
93 // Open a connection asynchronously, so that delegate won't be called
94 // back before returning Connect().
95 next_state_ = STATE_RESOLVE_PROXY;
96 MessageLoop::current()->PostTask(
97 FROM_HERE,
98 NewRunnableMethod(this, &SocketStream::DoLoop, OK));
99 }
100
101 bool SocketStream::SendData(const char* data, int len) {
102 DCHECK(MessageLoop::current()) <<
103 "The current MessageLoop must exist";
104 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
105 "The current MessageLoop must be TYPE_IO";
106 if (!socket_.get() || !socket_->IsConnected())
107 return false;
108 if (write_buf_) {
109 int current_amount_send = write_buf_size_ - write_buf_offset_;
110 for (PendingDataQueue::const_iterator iter = pending_write_bufs_.begin();
111 iter != pending_write_bufs_.end();
112 ++iter)
113 current_amount_send += (*iter)->size();
114
115 current_amount_send += len;
116 if (current_amount_send > max_pending_send_allowed_)
117 return false;
118
119 pending_write_bufs_.push_back(new IOBufferWithSize(len));
120 memcpy(pending_write_bufs_.back()->data(), data, len);
121 return true;
122 }
123 DCHECK(!current_write_buf_);
124 write_buf_ = new IOBuffer(len);
125 memcpy(write_buf_->data(), data, len);
126 write_buf_size_ = len;
127 write_buf_offset_ = 0;
128 // Send pending data asynchronously, so that delegate won't be called
129 // back before returning SendData().
130 MessageLoop::current()->PostTask(
131 FROM_HERE,
132 NewRunnableMethod(this, &SocketStream::DoLoop, OK));
133 return true;
134 }
135
136 void SocketStream::Close() {
137 DCHECK(MessageLoop::current()) <<
138 "The current MessageLoop must exist";
139 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
140 "The current MessageLoop must be TYPE_IO";
141 if (!socket_.get())
142 return;
143 if (socket_->IsConnected())
144 socket_->Disconnect();
145 // Close asynchronously, so that delegate won't be called
146 // back before returning Close().
147 MessageLoop::current()->PostTask(
148 FROM_HERE,
149 NewRunnableMethod(this, &SocketStream::DoLoop, OK));
150 }
151
152 void SocketStream::DetachDelegate() {
153 if (!delegate_)
154 return;
155 delegate_ = NULL;
156 Close();
157 }
158
159 void SocketStream::Finish() {
160 DCHECK(MessageLoop::current()) <<
161 "The current MessageLoop must exist";
162 DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
163 "The current MessageLoop must be TYPE_IO";
164 Delegate* delegate = delegate_;
165 delegate_ = NULL;
166 if (delegate) {
167 delegate->OnClose(this);
168 Release();
169 }
170 }
171
172 void SocketStream::SetHostResolver(HostResolver* host_resolver) {
173 DCHECK(host_resolver);
174 host_resolver_ = host_resolver;
175 }
176
177 void SocketStream::SetClientSocketFactory(
178 ClientSocketFactory* factory) {
179 DCHECK(factory);
180 factory_ = factory;
181 }
182
183 void SocketStream::DidEstablishConnection() {
184 if (!socket_.get() || !socket_->IsConnected()) {
185 Finish();
186 return;
187 }
188 next_state_ = STATE_READ_WRITE;
189
190 if (delegate_)
191 delegate_->OnConnected(this, max_pending_send_allowed_);
192
193 return;
194 }
195
196 void SocketStream::DidReceiveData(int result) {
197 DCHECK(read_buf_);
198 DCHECK(result > 0);
199 if (!delegate_)
200 return;
201 // Notify recevied data to delegate.
202 delegate_->OnReceivedData(this, read_buf_->data(), result);
203 read_buf_ = NULL;
204 }
205
206 void SocketStream::DidSendData(int result) {
207 current_write_buf_ = NULL;
208 DCHECK(result > 0);
209 if (!delegate_)
210 return;
211
212 delegate_->OnSentData(this, result);
213 int remaining_size = write_buf_size_ - write_buf_offset_ - result;
214 if (remaining_size == 0) {
215 if (!pending_write_bufs_.empty()) {
216 write_buf_size_ = pending_write_bufs_.front()->size();
217 write_buf_ = pending_write_bufs_.front();
218 pending_write_bufs_.pop_front();
219 } else {
220 write_buf_size_ = 0;
221 write_buf_ = NULL;
222 }
223 write_buf_offset_ = 0;
224 } else {
225 write_buf_offset_ += result;
226 }
227 }
228
229 void SocketStream::OnIOCompleted(int result) {
230 DoLoop(result);
231 // TODO(ukai): notify error.
232 }
233
234 void SocketStream::OnReadCompleted(int result) {
235 // TODO(ukai): notify error.
236 if (result >= 0 && read_buf_) {
237 DidReceiveData(result);
238 result = OK;
239 }
240 DoLoop(result);
241 }
242
243 void SocketStream::OnWriteCompleted(int result) {
244 // TODO(ukai): notify error.
245 if (result >= 0 && write_buf_) {
246 DidSendData(result);
247 result = OK;
248 }
249 DoLoop(result);
250 }
251
252 int SocketStream::DoLoop(int result) {
253 if (next_state_ == STATE_NONE) {
254 Finish();
255 return ERR_CONNECTION_CLOSED;
256 }
257
258 do {
259 State state = next_state_;
260 next_state_ = STATE_NONE;
261 switch (state) {
262 case STATE_RESOLVE_PROXY:
263 DCHECK_EQ(OK, result);
264 result = DoResolveProxy();
265 break;
266 case STATE_RESOLVE_PROXY_COMPLETE:
267 result = DoResolveProxyComplete(result);
268 break;
269 case STATE_RESOLVE_HOST:
270 DCHECK_EQ(OK, result);
271 result = DoResolveHost();
272 break;
273 case STATE_RESOLVE_HOST_COMPLETE:
274 result = DoResolveHostComplete(result);
275 break;
276 case STATE_TCP_CONNECT:
277 DCHECK_EQ(OK, result);
278 result = DoTcpConnect();
279 break;
280 case STATE_TCP_CONNECT_COMPLETE:
281 result = DoTcpConnectComplete(result);
282 break;
283 case STATE_WRITE_TUNNEL_HEADERS:
284 DCHECK_EQ(OK, result);
285 result = DoWriteTunnelHeaders();
286 break;
287 case STATE_WRITE_TUNNEL_HEADERS_COMPLETE:
288 result = DoWriteTunnelHeadersComplete(result);
289 break;
290 case STATE_READ_TUNNEL_HEADERS:
291 DCHECK_EQ(OK, result);
292 result = DoReadTunnelHeaders();
293 break;
294 case STATE_READ_TUNNEL_HEADERS_COMPLETE:
295 result = DoReadTunnelHeadersComplete(result);
296 break;
297 case STATE_SOCKS_CONNECT:
298 DCHECK_EQ(OK, result);
299 result = DoSOCKSConnect();
300 break;
301 case STATE_SOCKS_CONNECT_COMPLETE:
302 result = DoSOCKSConnectComplete(result);
303 break;
304 case STATE_SSL_CONNECT:
305 DCHECK_EQ(OK, result);
306 result = DoSSLConnect();
307 break;
308 case STATE_SSL_CONNECT_COMPLETE:
309 result = DoSSLConnectComplete(result);
310 break;
311 case STATE_READ_WRITE:
312 result = DoReadWrite(result);
313 break;
314 default:
315 NOTREACHED() << "bad state";
316 result = ERR_UNEXPECTED;
317 break;
318 }
319 } while (result != ERR_IO_PENDING && next_state_ != STATE_NONE);
320
321 if (result != ERR_IO_PENDING)
322 Finish();
323
324 return result;
325 }
326
327 int SocketStream::DoResolveProxy() {
328 DCHECK(!pac_request_);
329 next_state_ = STATE_RESOLVE_PROXY_COMPLETE;
330
331 return proxy_service()->ResolveProxy(
332 url_, &proxy_info_, &io_callback_, &pac_request_, NULL);
333 }
334
335 int SocketStream::DoResolveProxyComplete(int result) {
336 next_state_ = STATE_RESOLVE_HOST;
337
338 pac_request_ = NULL;
339 if (result != OK) {
340 LOG(ERROR) << "Failed to resolve proxy: " << result;
341 proxy_info_.UseDirect();
342 }
343
344 return OK;
345 }
346
347 int SocketStream::DoResolveHost() {
348 next_state_ = STATE_RESOLVE_HOST_COMPLETE;
349
350 if (proxy_info_.is_direct())
351 proxy_mode_ = kDirectConnection;
352 else if (proxy_info_.proxy_server().is_socks())
353 proxy_mode_ = kSOCKSProxy;
354 else
355 proxy_mode_ = kTunnelProxy;
356
357 // Determine the host and port to connect to.
358 std::string host;
359 int port;
360 if (proxy_mode_ != kDirectConnection) {
361 ProxyServer proxy_server = proxy_info_.proxy_server();
362 host = proxy_server.HostNoBrackets();
363 port = proxy_server.port();
364 } else {
365 host = url_.HostNoBrackets();
366 port = url_.EffectiveIntPort();
367 }
368
369 HostResolver::RequestInfo resolve_info(host, port);
370
371 resolver_.reset(new SingleRequestHostResolver(host_resolver_.get()));
372 return resolver_->Resolve(resolve_info, &addresses_, &io_callback_, NULL);
373 }
374
375 int SocketStream::DoResolveHostComplete(int result) {
376 if (result == OK)
377 next_state_ = STATE_TCP_CONNECT;
378 return result;
379 }
380
381 int SocketStream::DoTcpConnect() {
382 next_state_ = STATE_TCP_CONNECT_COMPLETE;
383 DCHECK(factory_);
384 socket_.reset(factory_->CreateTCPClientSocket(addresses_));
385 return socket_->Connect(&io_callback_);
386 }
387
388 int SocketStream::DoTcpConnectComplete(int result) {
389 if (result != OK)
390 return result;
391
392 if (proxy_mode_ == kTunnelProxy)
393 next_state_ = STATE_WRITE_TUNNEL_HEADERS;
394 else if (proxy_mode_ == kSOCKSProxy)
395 next_state_ = STATE_SOCKS_CONNECT;
396 else if (is_secure()) {
397 next_state_ = STATE_SSL_CONNECT;
398 } else {
399 DidEstablishConnection();
400 }
401 return OK;
402 }
403
404 int SocketStream::DoWriteTunnelHeaders() {
405 DCHECK_EQ(kTunnelProxy, proxy_mode_);
406
407 next_state_ = STATE_WRITE_TUNNEL_HEADERS_COMPLETE;
408
409 if (!tunnel_request_headers_.get()) {
410 tunnel_request_headers_ = new RequestHeaders();
411 tunnel_request_headers_bytes_sent_ = 0;
412 }
413 if (tunnel_request_headers_->headers_.empty()) {
414 tunnel_request_headers_->headers_ = StringPrintf(
415 "CONNECT %s HTTP/1.1\r\n"
416 "Host: %s\r\n"
417 "Proxy-Connection: keep-alive\r\n",
418 GetHostAndPort(url_).c_str(),
419 GetHostAndOptionalPort(url_).c_str());
420 // TODO(ukai): set proxy auth if necessary.
421 tunnel_request_headers_->headers_ += "\r\n";
422 }
423 tunnel_request_headers_->SetDataOffset(tunnel_request_headers_bytes_sent_);
424 int buf_len = static_cast<int>(tunnel_request_headers_->headers_.size() -
425 tunnel_request_headers_bytes_sent_);
426 DCHECK_GT(buf_len, 0);
427 return socket_->Write(tunnel_request_headers_, buf_len, &io_callback_);
428 }
429
430 int SocketStream::DoWriteTunnelHeadersComplete(int result) {
431 DCHECK_EQ(kTunnelProxy, proxy_mode_);
432
433 if (result < 0)
434 return result;
435
436 tunnel_request_headers_bytes_sent_ += result;
437 if (tunnel_request_headers_bytes_sent_ <
438 tunnel_request_headers_->headers_.size())
439 next_state_ = STATE_WRITE_TUNNEL_HEADERS;
440 else
441 next_state_ = STATE_READ_TUNNEL_HEADERS;
442 return OK;
443 }
444
445 int SocketStream::DoReadTunnelHeaders() {
446 DCHECK_EQ(kTunnelProxy, proxy_mode_);
447
448 next_state_ = STATE_READ_TUNNEL_HEADERS_COMPLETE;
449
450 if (!tunnel_response_headers_.get()) {
451 tunnel_response_headers_ = new ResponseHeaders();
452 tunnel_response_headers_capacity_ = kMaxTunnelResponseHeadersSize;
453 tunnel_response_headers_->Realloc(tunnel_response_headers_capacity_);
454 tunnel_response_headers_len_ = 0;
455 }
456
457 int buf_len = tunnel_response_headers_capacity_ -
458 tunnel_response_headers_len_;
459 tunnel_response_headers_->SetDataOffset(tunnel_response_headers_len_);
460 CHECK(tunnel_response_headers_->data());
461
462 return socket_->Read(tunnel_response_headers_, buf_len, &io_callback_);
463 }
464
465 int SocketStream::DoReadTunnelHeadersComplete(int result) {
466 DCHECK_EQ(kTunnelProxy, proxy_mode_);
467
468 if (result < 0)
469 return result;
470
471 tunnel_response_headers_len_ += result;
472 DCHECK(tunnel_response_headers_len_ <= tunnel_response_headers_capacity_);
473
474 int eoh = HttpUtil::LocateEndOfHeaders(
475 tunnel_response_headers_->headers(), tunnel_response_headers_len_, 0);
476 if (eoh == -1) {
477 if (tunnel_response_headers_len_ >= kMaxTunnelResponseHeadersSize)
478 return ERR_RESPONSE_HEADERS_TOO_BIG;
479
480 next_state_ = STATE_READ_TUNNEL_HEADERS;
481 return OK;
482 }
483 // DidReadResponseHeaders
484 scoped_refptr<HttpResponseHeaders> headers;
485 headers = new HttpResponseHeaders(
486 HttpUtil::AssembleRawHeaders(tunnel_response_headers_->headers(), eoh));
487 if (headers->GetParsedHttpVersion() < HttpVersion(1, 0)) {
488 // Require the "HTTP/1.x" status line.
489 return ERR_TUNNEL_CONNECTION_FAILED;
490 }
491 switch (headers->response_code()) {
492 case 200: // OK
493 if (is_secure()) {
494 DCHECK_EQ(eoh, tunnel_response_headers_len_);
495 next_state_ = STATE_SSL_CONNECT;
496 } else {
497 DidEstablishConnection();
498 if ((eoh < tunnel_response_headers_len_) && delegate_)
499 delegate_->OnReceivedData(
500 this, tunnel_response_headers_->headers() + eoh,
501 tunnel_response_headers_len_ - eoh);
502 }
503 return OK;
504 case 407: // Proxy Authentication Required.
505 // TODO(ukai): handle Proxy Authentication.
506 break;
507 default:
508 break;
509 }
510 return ERR_TUNNEL_CONNECTION_FAILED;
511 }
512
513 int SocketStream::DoSOCKSConnect() {
514 DCHECK_EQ(kSOCKSProxy, proxy_mode_);
515
516 next_state_ = STATE_SOCKS_CONNECT_COMPLETE;
517
518 ClientSocket* s = socket_.release();
519 HostResolver::RequestInfo req_info(url_.HostNoBrackets(),
520 url_.EffectiveIntPort());
521
522 if (proxy_info_.proxy_server().scheme() == ProxyServer::SCHEME_SOCKS5)
523 s = new SOCKS5ClientSocket(s, req_info, host_resolver_.get());
524 else
525 s = new SOCKSClientSocket(s, req_info, host_resolver_.get());
526 socket_.reset(s);
527 return socket_->Connect(&io_callback_);
528 }
529
530 int SocketStream::DoSOCKSConnectComplete(int result) {
531 DCHECK_EQ(kSOCKSProxy, proxy_mode_);
532
533 if (result == OK) {
534 if (is_secure())
535 next_state_ = STATE_SSL_CONNECT;
536 else
537 DidEstablishConnection();
538 }
539 return result;
540 }
541
542 int SocketStream::DoSSLConnect() {
543 DCHECK(factory_);
544 socket_.reset(factory_->CreateSSLClientSocket(
545 socket_.release(), url_.HostNoBrackets(), ssl_config_));
546 next_state_ = STATE_SSL_CONNECT_COMPLETE;
547 return socket_->Connect(&io_callback_);
548 }
549
550 int SocketStream::DoSSLConnectComplete(int result) {
551 if (IsCertificateError(result))
552 result = HandleCertificateError(result);
553
554 if (result == OK)
555 DidEstablishConnection();
556 return result;
557 }
558
559 int SocketStream::DoReadWrite(int result) {
560 if (result < OK) {
561 Finish();
562 return result;
563 }
564 if (!socket_.get() || !socket_->IsConnected()) {
565 Finish();
566 return ERR_CONNECTION_CLOSED;
567 }
568
569 next_state_ = STATE_READ_WRITE;
570
571 if (!read_buf_) {
572 read_buf_ = new IOBuffer(kReadBufferSize);
573 result = socket_->Read(read_buf_, kReadBufferSize, &read_callback_);
574 if (result > 0) {
575 DidReceiveData(result);
576 result = OK;
577 }
578 }
579 if (write_buf_ && !current_write_buf_) {
580 current_write_buf_ = new ReusedIOBuffer(write_buf_, write_buf_size_);
581 current_write_buf_->SetOffset(write_buf_offset_);
582 result = socket_->Write(current_write_buf_,
583 write_buf_size_ - write_buf_offset_,
584 &write_callback_);
585 if (result > 0) {
586 DidSendData(result);
587 result = OK;
588 }
589 }
590
591 // We arrived here when Write is performed and finished.
592 if (result == OK)
593 return ERR_IO_PENDING;
594 return result;
595 }
596
597 int SocketStream::HandleCertificateError(int result) {
598 // TODO(ukai): handle cert error properly.
599 switch (result) {
600 case ERR_CERT_COMMON_NAME_INVALID:
601 case ERR_CERT_DATE_INVALID:
602 case ERR_CERT_AUTHORITY_INVALID:
603 result = OK;
604 break;
605 default:
606 break;
607 }
608 return result;
609 }
610
611 bool SocketStream::is_secure() const {
612 return url_.SchemeIs("wss");
613 }
614
615 SSLConfigService* SocketStream::ssl_config_service() const {
616 return context_->ssl_config_service();
617 }
618
619 ProxyService* SocketStream::proxy_service() const {
620 return context_->proxy_service();
621 }
622
623 } // namespace net
OLDNEW
« net/socket_stream/socket_stream.h ('K') | « net/socket_stream/socket_stream.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698