Index: net/websockets/websocket_job.cc |
diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc |
index a1e2dde4513f504fa78b66581eede230e7546c69..be3a6142fe5340688ab703dac024fbb7cc41eeb3 100644 |
--- a/net/websockets/websocket_job.cc |
+++ b/net/websockets/websocket_job.cc |
@@ -13,7 +13,11 @@ |
#include "net/base/cookie_policy.h" |
#include "net/base/cookie_store.h" |
#include "net/base/io_buffer.h" |
+#include "net/http/http_network_session.h" |
+#include "net/http/http_transaction_factory.h" |
#include "net/http/http_util.h" |
+#include "net/spdy/spdy_session.h" |
+#include "net/spdy/spdy_session_pool.h" |
#include "net/url_request/url_request_context.h" |
#include "net/websockets/websocket_frame_handler.h" |
#include "net/websockets/websocket_handshake_handler.h" |
@@ -112,8 +116,9 @@ bool WebSocketJob::SendData(const char* data, int len) { |
current_buffer_ = new DrainableIOBuffer( |
send_frame_handler_->GetCurrentBuffer(), |
send_frame_handler_->GetCurrentBufferSize()); |
- return socket_->SendData( |
+ SendDataInternal( |
current_buffer_->data(), current_buffer_->BytesRemaining()); |
+ return true; |
} |
return err >= 0; |
} |
@@ -132,7 +137,7 @@ void WebSocketJob::Close() { |
return; |
} |
state_ = CLOSED; |
- socket_->Close(); |
+ CloseInternal(); |
} |
void WebSocketJob::RestartWithAuth( |
@@ -166,11 +171,14 @@ int WebSocketJob::OnStartOpenConnection( |
state_ = CONNECTING; |
addresses_.Copy(socket->address_list().head(), true); |
Singleton<WebSocketThrottle>::get()->PutInQueue(this); |
- if (!waiting_) |
- return OK; |
+ if (!waiting_) { |
+ int result = TrySpdyStream(); |
+ if (result != ERR_IO_PENDING) |
+ return result; |
+ } |
callback_ = callback; |
AddRef(); // Balanced when callback_ becomes NULL. |
- return ERR_IO_PENDING; |
+ return ERR_IO_PENDING; // Wakeup will be called later. |
} |
void WebSocketJob::OnConnected( |
@@ -265,6 +273,83 @@ void WebSocketJob::OnError(const SocketStream* socket, int error) { |
delegate_->OnError(socket, error); |
} |
+void WebSocketJob::OnCreatedSpdyStream( |
+ SpdyWebSocketStream* spdy_websocket_stream, int result) { |
+ if (state_ == CLOSED) { |
+ spdy_websocket_stream_.reset(); |
+ DoCallback(ERR_ABORTED); |
+ return; |
+ } |
+ DCHECK(spdy_websocket_stream_.get()); |
+ DCHECK(socket_.get()); |
+ if (result == OK) { |
+ socket_->SwitchToSpdy(); |
+ if (callback_) |
+ DoCallback(OK); |
+ return; |
+ } |
+ DCHECK_NE(ERR_IO_PENDING, result); |
+ spdy_websocket_stream_.reset(); |
+ if (callback_) |
+ DoCallback(result); |
+} |
+ |
+void WebSocketJob::OnSentSpdyHeaders( |
+ SpdyWebSocketStream* spdy_websocket_stream) { |
+ if (state_ != CONNECTING) |
+ return; |
+ if (delegate_) |
+ delegate_->OnSentData( |
+ socket_, |
+ handshake_request_->original_length()); |
+ handshake_request_.reset(); |
+} |
+ |
+int WebSocketJob::OnReceivedSpdyResponseHeader( |
+ SpdyWebSocketStream* spdy_websocket_stream, |
+ const spdy::SpdyHeaderBlock& headers, |
+ int status) { |
+ DCHECK_NE(INITIALIZED, state_); |
+ if (state_ != CONNECTING) |
+ return status; |
+ if (status != OK) |
+ return status; |
+ // TODO(ukai): fallback to non-spdy connection? |
+ handshake_response_->ParseResponseHeaderBlock(headers, challenge_); |
+ |
+ SaveCookiesAndNotifyHeaderComplete(); |
+ return OK; |
+} |
+ |
+void WebSocketJob::OnSentSpdyData( |
+ SpdyWebSocketStream* spdy_websocket_stream, int amount_sent) { |
+ DCHECK_NE(INITIALIZED, state_); |
+ DCHECK_NE(CONNECTING, state_); |
+ if (state_ == CLOSED) |
+ return; |
+ if (!spdy_websocket_stream_.get()) |
+ return; |
+ OnSentData(socket_, amount_sent); |
+} |
+ |
+void WebSocketJob::OnReceivedSpdyData( |
+ SpdyWebSocketStream* spdy_websocket_stream, |
+ const char* data, int length) { |
+ DCHECK_NE(INITIALIZED, state_); |
+ DCHECK_NE(CONNECTING, state_); |
+ if (state_ == CLOSED) |
+ return; |
+ if (!spdy_websocket_stream_.get()) |
+ return; |
+ OnReceivedData(socket_, data, length); |
+} |
+ |
+void WebSocketJob::OnCloseSpdyStream( |
+ SpdyWebSocketStream* spdy_websocket_stream) { |
+ spdy_websocket_stream_.reset(); |
+ OnClose(socket_); |
+} |
+ |
bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { |
DCHECK_EQ(state_, CONNECTING); |
if (!handshake_request_->ParseRequest(data, len)) |
@@ -309,14 +394,22 @@ void WebSocketJob::OnCanGetCookiesCompleted(int policy) { |
} |
} |
- const std::string& handshake_request = handshake_request_->GetRawRequest(); |
- handshake_request_sent_ = 0; |
- socket_->net_log()->AddEvent( |
- NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, |
- make_scoped_refptr( |
- new NetLogWebSocketHandshakeParameter(handshake_request))); |
- socket_->SendData(handshake_request.data(), |
- handshake_request.size()); |
+ if (spdy_websocket_stream_.get()) { |
+ linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); |
+ handshake_request_->GetRequestHeaderBlock( |
+ socket_->url(), headers.get(), &challenge_); |
+ spdy_websocket_stream_->SendRequest(headers); |
+ } else { |
+ const std::string& handshake_request = |
+ handshake_request_->GetRawRequest(); |
+ handshake_request_sent_ = 0; |
+ socket_->net_log()->AddEvent( |
+ NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, |
+ make_scoped_refptr( |
+ new NetLogWebSocketHandshakeParameter(handshake_request))); |
+ socket_->SendData(handshake_request.data(), |
+ handshake_request.size()); |
+ } |
} |
Release(); // Balance AddRef taken in AddCookieHeaderAndSend |
} |
@@ -459,6 +552,46 @@ const AddressList& WebSocketJob::address_list() const { |
return addresses_; |
} |
+int WebSocketJob::TrySpdyStream() { |
+ if (!socket_.get()) |
+ return ERR_FAILED; |
+ |
+ // Check if we have a SPDY session available. |
+ // If so, use it to create the websocket stream. |
+ HttpTransactionFactory* factory = |
+ socket_->context()->http_transaction_factory(); |
+ if (factory) { |
+ scoped_refptr<HttpNetworkSession> session = |
+ factory->GetSession(); |
+ if (session.get()) { |
+ SpdySessionPool* spdy_pool = session->spdy_session_pool(); |
+ const HostPortProxyPair pair(socket_->GetHostPortPair(), |
+ socket_->proxy_server()); |
+ if (spdy_pool->HasSession(pair)) { |
+ scoped_refptr<SpdySession> spdy_session = |
+ spdy_pool->Get(pair, |
+ session->mutable_spdy_settings(), |
+ *socket_->net_log()); |
+ |
+ spdy_websocket_stream_.reset( |
+ new SpdyWebSocketStream(spdy_session, this)); |
+ |
+ int result = spdy_websocket_stream_->InitializeStream( |
+ socket_->url(), MEDIUM, *socket_->net_log()); |
+ if (result != ERR_IO_PENDING && callback_) |
+ DoCallback(result); |
+ return result; |
+ } |
+ } |
+ } |
+ // No SPDY session was available. |
+ // Fallback to connecting a new socket. |
+ DCHECK(!spdy_websocket_stream_.get()); |
+ if (callback_) |
+ DoCallback(OK); |
+ return OK; |
+} |
+ |
void WebSocketJob::SetWaiting() { |
waiting_ = true; |
} |
@@ -475,19 +608,33 @@ void WebSocketJob::Wakeup() { |
MessageLoopForIO::current()->PostTask( |
FROM_HERE, |
NewRunnableMethod(this, |
- &WebSocketJob::DoCallback)); |
+ &WebSocketJob::TrySpdyStream)); |
} |
-void WebSocketJob::DoCallback() { |
+void WebSocketJob::DoCallback(int result) { |
// |callback_| may be NULL if OnClose() or DetachDelegate() was called. |
if (callback_) { |
net::CompletionCallback* callback = callback_; |
callback_ = NULL; |
- callback->Run(net::OK); |
+ callback->Run(result); |
Release(); // Balanced with OnStartOpenConnection(). |
} |
} |
+void WebSocketJob::SendDataInternal(const char* data, int length) { |
+ if (spdy_websocket_stream_.get()) |
+ spdy_websocket_stream_->SendData(data, length); |
+ else |
+ socket_->SendData(data, length); |
+} |
+ |
+void WebSocketJob::CloseInternal() { |
+ if (spdy_websocket_stream_.get()) |
+ spdy_websocket_stream_->Close(); |
+ else |
+ socket_->Close(); |
+} |
+ |
void WebSocketJob::SendPending() { |
if (current_buffer_) |
return; |
@@ -496,13 +643,13 @@ void WebSocketJob::SendPending() { |
if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { |
// No more data to send. |
if (state_ == CLOSING) |
- socket_->Close(); |
+ CloseInternal(); |
return; |
} |
current_buffer_ = new DrainableIOBuffer( |
send_frame_handler_->GetCurrentBuffer(), |
send_frame_handler_->GetCurrentBufferSize()); |
- socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining()); |
+ SendDataInternal(current_buffer_->data(), current_buffer_->BytesRemaining()); |
} |
} // namespace net |