Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(245)

Side by Side Diff: net/websockets/websocket_job.cc

Issue 3020068: WebSocket over SPDY. (Closed)
Patch Set: fix unittests Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/websockets/websocket_job.h ('k') | net/websockets/websocket_throttle_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/websockets/websocket_job.h" 5 #include "net/websockets/websocket_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/string_tokenizer.h" 9 #include "base/string_tokenizer.h"
10 #include "googleurl/src/gurl.h" 10 #include "googleurl/src/gurl.h"
11 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
12 #include "net/base/net_log.h" 12 #include "net/base/net_log.h"
13 #include "net/base/cookie_policy.h" 13 #include "net/base/cookie_policy.h"
14 #include "net/base/cookie_store.h" 14 #include "net/base/cookie_store.h"
15 #include "net/base/io_buffer.h" 15 #include "net/base/io_buffer.h"
16 #include "net/http/http_network_session.h"
17 #include "net/http/http_transaction_factory.h"
16 #include "net/http/http_util.h" 18 #include "net/http/http_util.h"
19 #include "net/spdy/spdy_session.h"
20 #include "net/spdy/spdy_session_pool.h"
17 #include "net/url_request/url_request_context.h" 21 #include "net/url_request/url_request_context.h"
18 #include "net/websockets/websocket_frame_handler.h" 22 #include "net/websockets/websocket_frame_handler.h"
19 #include "net/websockets/websocket_handshake_handler.h" 23 #include "net/websockets/websocket_handshake_handler.h"
20 #include "net/websockets/websocket_net_log_params.h" 24 #include "net/websockets/websocket_net_log_params.h"
21 #include "net/websockets/websocket_throttle.h" 25 #include "net/websockets/websocket_throttle.h"
22 26
23 namespace { 27 namespace {
24 28
25 // lower-case header names. 29 // lower-case header names.
26 const char* const kCookieHeaders[] = { 30 const char* const kCookieHeaders[] = {
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 // If we don't call OnSentData, WebCore::SocketStreamHandle would stop 109 // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
106 // sending more data when pending data reaches max_pending_send_allowed. 110 // sending more data when pending data reaches max_pending_send_allowed.
107 // TODO(ukai): Fix this to support compression for larger message. 111 // TODO(ukai): Fix this to support compression for larger message.
108 int err = 0; 112 int err = 0;
109 if (!send_frame_handler_->GetCurrentBuffer() && 113 if (!send_frame_handler_->GetCurrentBuffer() &&
110 (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) { 114 (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
111 DCHECK(!current_buffer_); 115 DCHECK(!current_buffer_);
112 current_buffer_ = new DrainableIOBuffer( 116 current_buffer_ = new DrainableIOBuffer(
113 send_frame_handler_->GetCurrentBuffer(), 117 send_frame_handler_->GetCurrentBuffer(),
114 send_frame_handler_->GetCurrentBufferSize()); 118 send_frame_handler_->GetCurrentBufferSize());
115 return socket_->SendData( 119 SendDataInternal(
116 current_buffer_->data(), current_buffer_->BytesRemaining()); 120 current_buffer_->data(), current_buffer_->BytesRemaining());
121 return true;
117 } 122 }
118 return err >= 0; 123 return err >= 0;
119 } 124 }
120 125
121 case CLOSING: 126 case CLOSING:
122 case CLOSED: 127 case CLOSED:
123 return false; 128 return false;
124 } 129 }
125 return false; 130 return false;
126 } 131 }
127 132
128 void WebSocketJob::Close() { 133 void WebSocketJob::Close() {
129 state_ = CLOSING; 134 state_ = CLOSING;
130 if (current_buffer_) { 135 if (current_buffer_) {
131 // Will close in SendPending. 136 // Will close in SendPending.
132 return; 137 return;
133 } 138 }
134 state_ = CLOSED; 139 state_ = CLOSED;
135 socket_->Close(); 140 CloseInternal();
136 } 141 }
137 142
138 void WebSocketJob::RestartWithAuth( 143 void WebSocketJob::RestartWithAuth(
139 const string16& username, 144 const string16& username,
140 const string16& password) { 145 const string16& password) {
141 state_ = CONNECTING; 146 state_ = CONNECTING;
142 socket_->RestartWithAuth(username, password); 147 socket_->RestartWithAuth(username, password);
143 } 148 }
144 149
145 void WebSocketJob::DetachDelegate() { 150 void WebSocketJob::DetachDelegate() {
(...skipping 13 matching lines...) Expand all
159 Release(); // Balanced with OnStartOpenConnection(). 164 Release(); // Balanced with OnStartOpenConnection().
160 } 165 }
161 } 166 }
162 167
163 int WebSocketJob::OnStartOpenConnection( 168 int WebSocketJob::OnStartOpenConnection(
164 SocketStream* socket, CompletionCallback* callback) { 169 SocketStream* socket, CompletionCallback* callback) {
165 DCHECK(!callback_); 170 DCHECK(!callback_);
166 state_ = CONNECTING; 171 state_ = CONNECTING;
167 addresses_.Copy(socket->address_list().head(), true); 172 addresses_.Copy(socket->address_list().head(), true);
168 Singleton<WebSocketThrottle>::get()->PutInQueue(this); 173 Singleton<WebSocketThrottle>::get()->PutInQueue(this);
169 if (!waiting_) 174 if (!waiting_) {
170 return OK; 175 int result = TrySpdyStream();
176 if (result != ERR_IO_PENDING)
177 return result;
178 }
171 callback_ = callback; 179 callback_ = callback;
172 AddRef(); // Balanced when callback_ becomes NULL. 180 AddRef(); // Balanced when callback_ becomes NULL.
173 return ERR_IO_PENDING; 181 return ERR_IO_PENDING; // Wakeup will be called later.
174 } 182 }
175 183
176 void WebSocketJob::OnConnected( 184 void WebSocketJob::OnConnected(
177 SocketStream* socket, int max_pending_send_allowed) { 185 SocketStream* socket, int max_pending_send_allowed) {
178 if (state_ == CLOSED) 186 if (state_ == CLOSED)
179 return; 187 return;
180 DCHECK_EQ(CONNECTING, state_); 188 DCHECK_EQ(CONNECTING, state_);
181 if (delegate_) 189 if (delegate_)
182 delegate_->OnConnected(socket, max_pending_send_allowed); 190 delegate_->OnConnected(socket, max_pending_send_allowed);
183 } 191 }
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
258 SocketStream* socket, AuthChallengeInfo* auth_info) { 266 SocketStream* socket, AuthChallengeInfo* auth_info) {
259 if (delegate_) 267 if (delegate_)
260 delegate_->OnAuthRequired(socket, auth_info); 268 delegate_->OnAuthRequired(socket, auth_info);
261 } 269 }
262 270
263 void WebSocketJob::OnError(const SocketStream* socket, int error) { 271 void WebSocketJob::OnError(const SocketStream* socket, int error) {
264 if (delegate_) 272 if (delegate_)
265 delegate_->OnError(socket, error); 273 delegate_->OnError(socket, error);
266 } 274 }
267 275
276 void WebSocketJob::OnCreatedSpdyStream(
277 SpdyWebSocketStream* spdy_websocket_stream, int result) {
278 if (state_ == CLOSED) {
279 spdy_websocket_stream_.reset();
280 DoCallback(ERR_ABORTED);
281 return;
282 }
283 DCHECK(spdy_websocket_stream_.get());
284 DCHECK(socket_.get());
285 if (result == OK) {
286 socket_->SwitchToSpdy();
287 if (callback_)
288 DoCallback(OK);
289 return;
290 }
291 DCHECK_NE(ERR_IO_PENDING, result);
292 spdy_websocket_stream_.reset();
293 if (callback_)
294 DoCallback(result);
295 }
296
297 void WebSocketJob::OnSentSpdyHeaders(
298 SpdyWebSocketStream* spdy_websocket_stream) {
299 if (state_ != CONNECTING)
300 return;
301 if (delegate_)
302 delegate_->OnSentData(
303 socket_,
304 handshake_request_->original_length());
305 handshake_request_.reset();
306 }
307
308 int WebSocketJob::OnReceivedSpdyResponseHeader(
309 SpdyWebSocketStream* spdy_websocket_stream,
310 const spdy::SpdyHeaderBlock& headers,
311 int status) {
312 DCHECK_NE(INITIALIZED, state_);
313 if (state_ != CONNECTING)
314 return status;
315 if (status != OK)
316 return status;
317 // TODO(ukai): fallback to non-spdy connection?
318 handshake_response_->ParseResponseHeaderBlock(headers, challenge_);
319
320 SaveCookiesAndNotifyHeaderComplete();
321 return OK;
322 }
323
324 void WebSocketJob::OnSentSpdyData(
325 SpdyWebSocketStream* spdy_websocket_stream, int amount_sent) {
326 DCHECK_NE(INITIALIZED, state_);
327 DCHECK_NE(CONNECTING, state_);
328 if (state_ == CLOSED)
329 return;
330 if (!spdy_websocket_stream_.get())
331 return;
332 OnSentData(socket_, amount_sent);
333 }
334
335 void WebSocketJob::OnReceivedSpdyData(
336 SpdyWebSocketStream* spdy_websocket_stream,
337 const char* data, int length) {
338 DCHECK_NE(INITIALIZED, state_);
339 DCHECK_NE(CONNECTING, state_);
340 if (state_ == CLOSED)
341 return;
342 if (!spdy_websocket_stream_.get())
343 return;
344 OnReceivedData(socket_, data, length);
345 }
346
347 void WebSocketJob::OnCloseSpdyStream(
348 SpdyWebSocketStream* spdy_websocket_stream) {
349 spdy_websocket_stream_.reset();
350 OnClose(socket_);
351 }
352
268 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 353 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
269 DCHECK_EQ(state_, CONNECTING); 354 DCHECK_EQ(state_, CONNECTING);
270 if (!handshake_request_->ParseRequest(data, len)) 355 if (!handshake_request_->ParseRequest(data, len))
271 return false; 356 return false;
272 357
273 // handshake message is completed. 358 // handshake message is completed.
274 AddCookieHeaderAndSend(); 359 AddCookieHeaderAndSend();
275 // Just buffered in |handshake_request_|. 360 // Just buffered in |handshake_request_|.
276 return true; 361 return true;
277 } 362 }
(...skipping 24 matching lines...) Expand all
302 CookieOptions cookie_options; 387 CookieOptions cookie_options;
303 cookie_options.set_include_httponly(); 388 cookie_options.set_include_httponly();
304 std::string cookie = 389 std::string cookie =
305 socket_->context()->cookie_store()->GetCookiesWithOptions( 390 socket_->context()->cookie_store()->GetCookiesWithOptions(
306 GetURLForCookies(), cookie_options); 391 GetURLForCookies(), cookie_options);
307 if (!cookie.empty()) 392 if (!cookie.empty())
308 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 393 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
309 } 394 }
310 } 395 }
311 396
312 const std::string& handshake_request = handshake_request_->GetRawRequest(); 397 if (spdy_websocket_stream_.get()) {
313 handshake_request_sent_ = 0; 398 linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
314 socket_->net_log()->AddEvent( 399 handshake_request_->GetRequestHeaderBlock(
315 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 400 socket_->url(), headers.get(), &challenge_);
316 make_scoped_refptr( 401 spdy_websocket_stream_->SendRequest(headers);
317 new NetLogWebSocketHandshakeParameter(handshake_request))); 402 } else {
318 socket_->SendData(handshake_request.data(), 403 const std::string& handshake_request =
319 handshake_request.size()); 404 handshake_request_->GetRawRequest();
405 handshake_request_sent_ = 0;
406 socket_->net_log()->AddEvent(
407 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
408 make_scoped_refptr(
409 new NetLogWebSocketHandshakeParameter(handshake_request)));
410 socket_->SendData(handshake_request.data(),
411 handshake_request.size());
412 }
320 } 413 }
321 Release(); // Balance AddRef taken in AddCookieHeaderAndSend 414 Release(); // Balance AddRef taken in AddCookieHeaderAndSend
322 } 415 }
323 416
324 void WebSocketJob::OnSentHandshakeRequest( 417 void WebSocketJob::OnSentHandshakeRequest(
325 SocketStream* socket, int amount_sent) { 418 SocketStream* socket, int amount_sent) {
326 DCHECK_EQ(state_, CONNECTING); 419 DCHECK_EQ(state_, CONNECTING);
327 handshake_request_sent_ += amount_sent; 420 handshake_request_sent_ += amount_sent;
328 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 421 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
329 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 422 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
452 url_canon::Replacements<char> replacements; 545 url_canon::Replacements<char> replacements;
453 replacements.SetScheme(scheme.c_str(), 546 replacements.SetScheme(scheme.c_str(),
454 url_parse::Component(0, scheme.length())); 547 url_parse::Component(0, scheme.length()));
455 return url.ReplaceComponents(replacements); 548 return url.ReplaceComponents(replacements);
456 } 549 }
457 550
458 const AddressList& WebSocketJob::address_list() const { 551 const AddressList& WebSocketJob::address_list() const {
459 return addresses_; 552 return addresses_;
460 } 553 }
461 554
555 int WebSocketJob::TrySpdyStream() {
556 if (!socket_.get())
557 return ERR_FAILED;
558
559 // Check if we have a SPDY session available.
560 // If so, use it to create the websocket stream.
561 HttpTransactionFactory* factory =
562 socket_->context()->http_transaction_factory();
563 if (factory) {
564 scoped_refptr<HttpNetworkSession> session =
565 factory->GetSession();
566 if (session.get()) {
567 SpdySessionPool* spdy_pool = session->spdy_session_pool();
568 const HostPortProxyPair pair(socket_->GetHostPortPair(),
569 socket_->proxy_server());
570 if (spdy_pool->HasSession(pair)) {
571 scoped_refptr<SpdySession> spdy_session =
572 spdy_pool->Get(pair,
573 session->mutable_spdy_settings(),
574 *socket_->net_log());
575
576 spdy_websocket_stream_.reset(
577 new SpdyWebSocketStream(spdy_session, this));
578
579 int result = spdy_websocket_stream_->InitializeStream(
580 socket_->url(), MEDIUM, *socket_->net_log());
581 if (result != ERR_IO_PENDING && callback_)
582 DoCallback(result);
583 return result;
584 }
585 }
586 }
587 // No SPDY session was available.
588 // Fallback to connecting a new socket.
589 DCHECK(!spdy_websocket_stream_.get());
590 if (callback_)
591 DoCallback(OK);
592 return OK;
593 }
594
462 void WebSocketJob::SetWaiting() { 595 void WebSocketJob::SetWaiting() {
463 waiting_ = true; 596 waiting_ = true;
464 } 597 }
465 598
466 bool WebSocketJob::IsWaiting() const { 599 bool WebSocketJob::IsWaiting() const {
467 return waiting_; 600 return waiting_;
468 } 601 }
469 602
470 void WebSocketJob::Wakeup() { 603 void WebSocketJob::Wakeup() {
471 if (!waiting_) 604 if (!waiting_)
472 return; 605 return;
473 waiting_ = false; 606 waiting_ = false;
474 DCHECK(callback_); 607 DCHECK(callback_);
475 MessageLoopForIO::current()->PostTask( 608 MessageLoopForIO::current()->PostTask(
476 FROM_HERE, 609 FROM_HERE,
477 NewRunnableMethod(this, 610 NewRunnableMethod(this,
478 &WebSocketJob::DoCallback)); 611 &WebSocketJob::TrySpdyStream));
479 } 612 }
480 613
481 void WebSocketJob::DoCallback() { 614 void WebSocketJob::DoCallback(int result) {
482 // |callback_| may be NULL if OnClose() or DetachDelegate() was called. 615 // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
483 if (callback_) { 616 if (callback_) {
484 net::CompletionCallback* callback = callback_; 617 net::CompletionCallback* callback = callback_;
485 callback_ = NULL; 618 callback_ = NULL;
486 callback->Run(net::OK); 619 callback->Run(result);
487 Release(); // Balanced with OnStartOpenConnection(). 620 Release(); // Balanced with OnStartOpenConnection().
488 } 621 }
489 } 622 }
490 623
624 void WebSocketJob::SendDataInternal(const char* data, int length) {
625 if (spdy_websocket_stream_.get())
626 spdy_websocket_stream_->SendData(data, length);
627 else
628 socket_->SendData(data, length);
629 }
630
631 void WebSocketJob::CloseInternal() {
632 if (spdy_websocket_stream_.get())
633 spdy_websocket_stream_->Close();
634 else
635 socket_->Close();
636 }
637
491 void WebSocketJob::SendPending() { 638 void WebSocketJob::SendPending() {
492 if (current_buffer_) 639 if (current_buffer_)
493 return; 640 return;
494 // Current buffer is done. Try next buffer if any. 641 // Current buffer is done. Try next buffer if any.
495 // Don't buffer sending data. See comment on case OPEN in SendData(). 642 // Don't buffer sending data. See comment on case OPEN in SendData().
496 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { 643 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
497 // No more data to send. 644 // No more data to send.
498 if (state_ == CLOSING) 645 if (state_ == CLOSING)
499 socket_->Close(); 646 CloseInternal();
500 return; 647 return;
501 } 648 }
502 current_buffer_ = new DrainableIOBuffer( 649 current_buffer_ = new DrainableIOBuffer(
503 send_frame_handler_->GetCurrentBuffer(), 650 send_frame_handler_->GetCurrentBuffer(),
504 send_frame_handler_->GetCurrentBufferSize()); 651 send_frame_handler_->GetCurrentBufferSize());
505 socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining()); 652 SendDataInternal(current_buffer_->data(), current_buffer_->BytesRemaining());
506 } 653 }
507 654
508 } // namespace net 655 } // namespace net
OLDNEW
« no previous file with comments | « net/websockets/websocket_job.h ('k') | net/websockets/websocket_throttle_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698