Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(507)

Side by Side Diff: net/websockets/websocket_job.cc

Issue 7185032: WebSocket over SPDY: WebSocketJob handling SpdyWebSocketStream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: last change cause deadlock on SocketStreamTest Created 9 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/websockets/websocket_job.h" 5 #include "net/websockets/websocket_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/lazy_instance.h" 9 #include "base/lazy_instance.h"
10 #include "base/string_tokenizer.h" 10 #include "base/string_tokenizer.h"
11 #include "googleurl/src/gurl.h" 11 #include "googleurl/src/gurl.h"
12 #include "net/base/net_errors.h" 12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h" 13 #include "net/base/net_log.h"
14 #include "net/base/cookie_store.h" 14 #include "net/base/cookie_store.h"
15 #include "net/base/io_buffer.h" 15 #include "net/base/io_buffer.h"
16 #include "net/http/http_network_session.h" 16 #include "net/http/http_network_session.h"
17 #include "net/http/http_transaction_factory.h" 17 #include "net/http/http_transaction_factory.h"
18 #include "net/http/http_util.h" 18 #include "net/http/http_util.h"
19 #include "net/spdy/spdy_session.h" 19 #include "net/spdy/spdy_session.h"
20 #include "net/spdy/spdy_session_pool.h" 20 #include "net/spdy/spdy_session_pool.h"
21 #include "net/url_request/url_request_context.h" 21 #include "net/url_request/url_request_context.h"
22 #include "net/websockets/websocket_frame_handler.h" 22 #include "net/websockets/websocket_frame_handler.h"
23 #include "net/websockets/websocket_handshake_handler.h" 23 #include "net/websockets/websocket_handshake_handler.h"
24 #include "net/websockets/websocket_net_log_params.h" 24 #include "net/websockets/websocket_net_log_params.h"
25 #include "net/websockets/websocket_throttle.h" 25 #include "net/websockets/websocket_throttle.h"
26 26
27 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
28
27 namespace { 29 namespace {
28 30
29 // lower-case header names. 31 // lower-case header names.
30 const char* const kCookieHeaders[] = { 32 const char* const kCookieHeaders[] = {
31 "cookie", "cookie2" 33 "cookie", "cookie2"
32 }; 34 };
33 const char* const kSetCookieHeaders[] = { 35 const char* const kSetCookieHeaders[] = {
34 "set-cookie", "set-cookie2" 36 "set-cookie", "set-cookie2"
35 }; 37 };
36 38
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
171 Release(); // Balanced with OnStartOpenConnection(). 173 Release(); // Balanced with OnStartOpenConnection().
172 } 174 }
173 } 175 }
174 176
175 int WebSocketJob::OnStartOpenConnection( 177 int WebSocketJob::OnStartOpenConnection(
176 SocketStream* socket, CompletionCallback* callback) { 178 SocketStream* socket, CompletionCallback* callback) {
177 DCHECK(!callback_); 179 DCHECK(!callback_);
178 state_ = CONNECTING; 180 state_ = CONNECTING;
179 addresses_ = socket->address_list(); 181 addresses_ = socket->address_list();
180 WebSocketThrottle::GetInstance()->PutInQueue(this); 182 WebSocketThrottle::GetInstance()->PutInQueue(this);
181 if (!waiting_) { 183 if (waiting_) {
182 int result = TrySpdyStream(); 184 // PutInQueue() may set |waiting_| true for throttling. In this case,
183 if (result != ERR_IO_PENDING) 185 // Wakeup() will be called later.
184 return result; 186 callback_ = callback;
187 AddRef(); // Balanced when callback_ becomes NULL.
188 return ERR_IO_PENDING;
185 } 189 }
186 callback_ = callback; 190 return TrySpdyStream();
187 AddRef(); // Balanced when callback_ becomes NULL.
188 return ERR_IO_PENDING; // Wakeup will be called later.
189 } 191 }
190 192
191 void WebSocketJob::OnConnected( 193 void WebSocketJob::OnConnected(
192 SocketStream* socket, int max_pending_send_allowed) { 194 SocketStream* socket, int max_pending_send_allowed) {
193 if (state_ == CLOSED) 195 if (state_ == CLOSED)
194 return; 196 return;
195 DCHECK_EQ(CONNECTING, state_); 197 DCHECK_EQ(CONNECTING, state_);
196 if (delegate_) 198 if (delegate_)
197 delegate_->OnConnected(socket, max_pending_send_allowed); 199 delegate_->OnConnected(socket, max_pending_send_allowed);
198 } 200 }
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
269 delegate->OnClose(socket); 271 delegate->OnClose(socket);
270 } 272 }
271 273
272 void WebSocketJob::OnAuthRequired( 274 void WebSocketJob::OnAuthRequired(
273 SocketStream* socket, AuthChallengeInfo* auth_info) { 275 SocketStream* socket, AuthChallengeInfo* auth_info) {
274 if (delegate_) 276 if (delegate_)
275 delegate_->OnAuthRequired(socket, auth_info); 277 delegate_->OnAuthRequired(socket, auth_info);
276 } 278 }
277 279
278 void WebSocketJob::OnError(const SocketStream* socket, int error) { 280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
281 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
282 delegate_->OnError(socket, error);
283 }
284
285 void WebSocketJob::OnCreatedSpdyStream(int result) {
286 DCHECK(spdy_websocket_stream_.get());
287 DCHECK(socket_.get());
288 DCHECK_NE(ERR_IO_PENDING, result);
289
290 if (state_ == CLOSED) {
291 result = ERR_ABORTED;
292 } else if (result == OK) {
293 state_ = CONNECTING;
294 result = ERR_PROTOCOL_SWITCHED;
295 } else {
296 spdy_websocket_stream_.reset();
297 }
298
299 CompleteIO(result);
300 }
301
302 void WebSocketJob::OnSentSpdyHeaders(int result) {
303 DCHECK_NE(INITIALIZED, state_);
304 if (state_ != CONNECTING)
305 return;
279 if (delegate_) 306 if (delegate_)
280 delegate_->OnError(socket, error); 307 delegate_->OnSentData(socket_, handshake_request_->original_length());
308 handshake_request_.reset();
309 }
310
311 int WebSocketJob::OnReceivedSpdyResponseHeader(
312 const spdy::SpdyHeaderBlock& headers, int status) {
313 DCHECK_NE(INITIALIZED, state_);
314 if (state_ != CONNECTING)
315 return status;
316 if (status != OK)
317 return status;
318 // TODO(toyoshim): Fallback to non-spdy connection?
319 handshake_response_->ParseResponseHeaderBlock(headers, challenge_);
320
321 SaveCookiesAndNotifyHeaderComplete();
322 return OK;
323 }
324
325 void WebSocketJob::OnSentSpdyData(int amount_sent) {
326 DCHECK_NE(INITIALIZED, state_);
327 DCHECK_NE(CONNECTING, state_);
328 if (state_ == CLOSED)
329 return;
330 if (!spdy_websocket_stream_.get())
331 return;
332 OnSentData(socket_, amount_sent);
333 }
334
335 void WebSocketJob::OnReceivedSpdyData(const char* data, int length) {
336 DCHECK_NE(INITIALIZED, state_);
337 DCHECK_NE(CONNECTING, state_);
338 if (state_ == CLOSED)
339 return;
340 if (!spdy_websocket_stream_.get())
341 return;
342 OnReceivedData(socket_, data, length);
343 }
344
345 void WebSocketJob::OnCloseSpdyStream() {
346 spdy_websocket_stream_.reset();
347 OnClose(socket_);
281 } 348 }
282 349
283 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 350 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
284 DCHECK_EQ(state_, CONNECTING); 351 DCHECK_EQ(state_, CONNECTING);
285 if (started_to_send_handshake_request_) 352 if (started_to_send_handshake_request_)
286 return false; 353 return false;
287 if (!handshake_request_->ParseRequest(data, len)) 354 if (!handshake_request_->ParseRequest(data, len))
288 return false; 355 return false;
289 356
290 // handshake message is completed. 357 // handshake message is completed.
(...skipping 19 matching lines...) Expand all
310 CookieOptions cookie_options; 377 CookieOptions cookie_options;
311 cookie_options.set_include_httponly(); 378 cookie_options.set_include_httponly();
312 std::string cookie = 379 std::string cookie =
313 socket_->context()->cookie_store()->GetCookiesWithOptions( 380 socket_->context()->cookie_store()->GetCookiesWithOptions(
314 GetURLForCookies(), cookie_options); 381 GetURLForCookies(), cookie_options);
315 if (!cookie.empty()) 382 if (!cookie.empty())
316 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 383 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
317 } 384 }
318 } 385 }
319 386
320 const std::string& handshake_request = handshake_request_->GetRawRequest(); 387 if (spdy_websocket_stream_.get()) {
321 handshake_request_sent_ = 0; 388 linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock);
322 socket_->net_log()->AddEvent( 389 handshake_request_->GetRequestHeaderBlock(
323 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 390 socket_->url(), headers.get(), &challenge_);
324 make_scoped_refptr( 391 spdy_websocket_stream_->SendRequest(headers);
325 new NetLogWebSocketHandshakeParameter(handshake_request))); 392 } else {
326 socket_->SendData(handshake_request.data(), 393 const std::string& handshake_request =
327 handshake_request.size()); 394 handshake_request_->GetRawRequest();
395 handshake_request_sent_ = 0;
396 socket_->net_log()->AddEvent(
397 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
398 make_scoped_refptr(
399 new NetLogWebSocketHandshakeParameter(handshake_request)));
400 socket_->SendData(handshake_request.data(),
401 handshake_request.size());
402 }
328 } 403 }
329 } 404 }
330 405
331 void WebSocketJob::OnSentHandshakeRequest( 406 void WebSocketJob::OnSentHandshakeRequest(
332 SocketStream* socket, int amount_sent) { 407 SocketStream* socket, int amount_sent) {
333 DCHECK_EQ(state_, CONNECTING); 408 DCHECK_EQ(state_, CONNECTING);
334 handshake_request_sent_ += amount_sent; 409 handshake_request_sent_ += amount_sent;
335 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 410 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
336 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 411 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
337 // handshake request has been sent. 412 // handshake request has been sent.
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 // If so, use it to create the websocket stream. 532 // If so, use it to create the websocket stream.
458 HttpTransactionFactory* factory = 533 HttpTransactionFactory* factory =
459 socket_->context()->http_transaction_factory(); 534 socket_->context()->http_transaction_factory();
460 if (factory) { 535 if (factory) {
461 scoped_refptr<HttpNetworkSession> session = factory->GetSession(); 536 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
462 if (session.get()) { 537 if (session.get()) {
463 SpdySessionPool* spdy_pool = session->spdy_session_pool(); 538 SpdySessionPool* spdy_pool = session->spdy_session_pool();
464 const HostPortProxyPair pair(HostPortPair::FromURL(socket_->url()), 539 const HostPortProxyPair pair(HostPortPair::FromURL(socket_->url()),
465 socket_->proxy_server()); 540 socket_->proxy_server());
466 if (spdy_pool->HasSession(pair)) { 541 if (spdy_pool->HasSession(pair)) {
467 // TODO(toyoshim): Switch to SpdyWebSocketStream here by returning 542 scoped_refptr<SpdySession> spdy_session =
468 // ERR_PROTOCOL_SWITCHED. 543 spdy_pool->Get(pair, *socket_->net_log());
544
545 SSLInfo ssl_info;
546 bool was_npn_negotiated;
547 bool use_ssl =
548 spdy_session->GetSSLInfo(&ssl_info, &was_npn_negotiated);
549
550 // Forbid wss downgrade to SPDY without SSL.
551 if (!socket_->is_secure() || use_ssl) {
Yuta Kitamura 2011/07/08 07:40:21 This seems right, but an identical expression "!(s
Takashi Toyoshima 2011/07/08 08:35:14 Done.
552 spdy_websocket_stream_.reset(
553 new SpdyWebSocketStream(spdy_session, this));
554
555 int result = spdy_websocket_stream_->InitializeStream(
556 socket_->url(), MEDIUM, *socket_->net_log());
557 if (result == OK) {
558 OnConnected(socket_, kMaxPendingSendAllowed);
559 return ERR_PROTOCOL_SWITCHED;
560 }
561 if (result == ERR_IO_PENDING)
562 return ERR_IO_PENDING;
563
564 // Fallback to WebSocket wire protocol.
565 spdy_websocket_stream_.reset();
566 }
Yuta Kitamura 2011/07/08 07:40:21 (Optional) These blocks seem nested too deeply. G
Takashi Toyoshima 2011/07/08 08:35:14 I love it. Done!
469 } 567 }
470 } 568 }
471 } 569 }
472 } 570 }
473 // No SPDY session was available. 571 // No SPDY session was available.
474 // Fallback to connecting a new socket. 572 // Fallback to connecting a new socket.
475 return OK; 573 return OK;
476 } 574 }
477 575
478 void WebSocketJob::SetWaiting() { 576 void WebSocketJob::SetWaiting() {
479 waiting_ = true; 577 waiting_ = true;
480 } 578 }
481 579
482 bool WebSocketJob::IsWaiting() const { 580 bool WebSocketJob::IsWaiting() const {
483 return waiting_; 581 return waiting_;
484 } 582 }
485 583
486 void WebSocketJob::Wakeup() { 584 void WebSocketJob::Wakeup() {
487 if (!waiting_) 585 if (!waiting_)
488 return; 586 return;
489 waiting_ = false; 587 waiting_ = false;
490 DCHECK(callback_); 588 DCHECK(callback_);
491 MessageLoopForIO::current()->PostTask( 589 MessageLoopForIO::current()->PostTask(
492 FROM_HERE, 590 FROM_HERE,
493 NewRunnableMethod(this, &WebSocketJob::RetryPendingIO)); 591 NewRunnableMethod(this, &WebSocketJob::RetryPendingIO));
494 } 592 }
495 593
496 void WebSocketJob::RetryPendingIO() { 594 void WebSocketJob::RetryPendingIO() {
497 int result = TrySpdyStream(); 595 CompleteIO(TrySpdyStream());
Yuta Kitamura 2011/07/08 07:40:21 TrySpdyStream() can return ERR_IO_PENDING. Is it o
Takashi Toyoshima 2011/07/08 08:35:14 Oh! It's a bug to cause DCHECK failure !! In that
596 }
597
598 void WebSocketJob::CompleteIO(int result) {
498 // |callback_| may be NULL if OnClose() or DetachDelegate() was called. 599 // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
499 if (callback_) { 600 if (callback_) {
500 net::CompletionCallback* callback = callback_; 601 net::CompletionCallback* callback = callback_;
501 callback_ = NULL; 602 callback_ = NULL;
502 callback->Run(result); 603 callback->Run(result);
503 Release(); // Balanced with OnStartOpenConnection(). 604 Release(); // Balanced with OnStartOpenConnection().
504 } 605 }
505 } 606 }
506 607
507 bool WebSocketJob::SendDataInternal(const char* data, int length) { 608 bool WebSocketJob::SendDataInternal(const char* data, int length) {
508 // TODO(toyoshim): Call protocol specific SendData(). 609 if (spdy_websocket_stream_.get())
610 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
509 return socket_->SendData(data, length); 611 return socket_->SendData(data, length);
510 } 612 }
511 613
512 void WebSocketJob::CloseInternal() { 614 void WebSocketJob::CloseInternal() {
513 // TODO(toyoshim): Call protocol specific Close(). 615 if (spdy_websocket_stream_.get())
616 spdy_websocket_stream_->Close();
514 socket_->Close(); 617 socket_->Close();
515 } 618 }
516 619
517 void WebSocketJob::SendPending() { 620 void WebSocketJob::SendPending() {
518 if (current_buffer_) 621 if (current_buffer_)
519 return; 622 return;
520 // Current buffer is done. Try next buffer if any. 623 // Current buffer is done. Try next buffer if any.
521 // Don't buffer sending data. See comment on case OPEN in SendData(). 624 // Don't buffer sending data. See comment on case OPEN in SendData().
522 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { 625 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
523 // No more data to send. 626 // No more data to send.
524 if (state_ == CLOSING) 627 if (state_ == CLOSING)
525 CloseInternal(); 628 CloseInternal();
526 return; 629 return;
527 } 630 }
528 current_buffer_ = new DrainableIOBuffer( 631 current_buffer_ = new DrainableIOBuffer(
529 send_frame_handler_->GetCurrentBuffer(), 632 send_frame_handler_->GetCurrentBuffer(),
530 send_frame_handler_->GetCurrentBufferSize()); 633 send_frame_handler_->GetCurrentBufferSize());
531 SendDataInternal(current_buffer_->data(), current_buffer_->BytesRemaining()); 634 SendDataInternal(current_buffer_->data(), current_buffer_->BytesRemaining());
532 } 635 }
533 636
534 } // namespace net 637 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698