Index: mojo/services/network/http_connection_impl.cc |
diff --git a/mojo/services/network/http_connection_impl.cc b/mojo/services/network/http_connection_impl.cc |
index 0831d1d7a888bad97dcfad5faaaa7553acb0cfd9..77d7a9165c61c3cc87ed79448d75993fa157dda5 100644 |
--- a/mojo/services/network/http_connection_impl.cc |
+++ b/mojo/services/network/http_connection_impl.cc |
@@ -14,6 +14,9 @@ |
#include "mojo/common/handle_watcher.h" |
#include "mojo/services/network/http_server_impl.h" |
#include "mojo/services/network/net_adapters.h" |
+#include "mojo/services/network/public/cpp/web_socket_read_queue.h" |
+#include "mojo/services/network/public/cpp/web_socket_write_queue.h" |
+#include "mojo/services/network/public/interfaces/web_socket.mojom.h" |
#include "net/base/net_errors.h" |
#include "net/http/http_request_headers.h" |
#include "net/http/http_status_code.h" |
@@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader { |
DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); |
}; |
+class HttpConnectionImpl::WebSocketImpl : public WebSocket, |
+ public ErrorHandler { |
+ public: |
+ // |connection| must outlive this object. |
+ WebSocketImpl(HttpConnectionImpl* connection, |
+ InterfaceRequest<WebSocket> request, |
+ ScopedDataPipeConsumerHandle send_stream, |
+ WebSocketClientPtr client) |
+ : connection_(connection), |
+ binding_(this, request.Pass()), |
+ client_(client.Pass()), |
+ send_stream_(send_stream.Pass()), |
+ read_send_stream_(new WebSocketReadQueue(send_stream_.get())), |
+ pending_send_count_(0) { |
+ DCHECK(binding_.is_bound()); |
+ DCHECK(client_); |
+ DCHECK(send_stream_.is_valid()); |
+ |
+ binding_.set_error_handler(this); |
+ client_.set_error_handler(this); |
+ |
+ DataPipe data_pipe; |
+ receive_stream_ = data_pipe.producer_handle.Pass(); |
+ write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get())); |
+ |
+ client_->DidConnect("", "", data_pipe.consumer_handle.Pass()); |
+ } |
+ |
+ ~WebSocketImpl() override {} |
+ |
+ void Close() { |
+ DCHECK(!IsClosing()); |
+ |
+ binding_.Close(); |
+ client_.reset(); |
+ |
+ NotifyOwnerCloseIfAllDone(); |
+ } |
+ |
+ void OnReceivedWebSocketMessage(const std::string& data) { |
+ if (IsClosing()) |
+ return; |
+ |
+ // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, |
+ // WebSocket{Read,Write}Queue doesn't handle that correctly. |
+ if (data.empty()) |
+ return; |
+ |
+ uint32_t size = static_cast<uint32_t>(data.size()); |
+ write_receive_stream_->Write( |
+ &data[0], size, |
+ base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream, |
+ base::Unretained(this), size)); |
+ } |
+ |
+ private: |
+ // WebSocket implementation. |
+ void Connect(const String& url, |
+ Array<String> protocols, |
+ const String& origin, |
+ ScopedDataPipeConsumerHandle send_stream, |
+ WebSocketClientPtr client) override { |
+ NOTREACHED(); |
+ } |
+ |
+ void Send(bool fin, MessageType type, uint32_t num_bytes) override { |
+ if (!fin || type != MESSAGE_TYPE_TEXT) { |
+ NOTIMPLEMENTED(); |
+ Close(); |
+ } |
+ |
+ // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, |
+ // WebSocket{Read,Write}Queue doesn't handle that correctly. |
+ if (num_bytes == 0) |
+ return; |
+ |
+ pending_send_count_++; |
+ read_send_stream_->Read( |
+ num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream, |
+ base::Unretained(this), num_bytes)); |
+ } |
+ |
+ void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); } |
+ |
+ void Close(uint16_t code, const String& reason) override { |
+ Close(); |
+ } |
+ |
+ // ErrorHandler implementation. |
+ void OnConnectionError() override { Close(); } |
+ |
+ void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) { |
+ DCHECK_GT(pending_send_count_, 0u); |
+ pending_send_count_--; |
+ |
+ if (data) { |
+ connection_->server_->server()->SendOverWebSocket( |
+ connection_->connection_id_, std::string(data, num_bytes)); |
+ } |
+ |
+ if (IsClosing()) |
+ NotifyOwnerCloseIfAllDone(); |
+ } |
+ |
+ void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) { |
+ if (IsClosing()) |
+ return; |
+ |
+ if (buffer) |
+ client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes); |
+ } |
+ |
+ // Checks whether Close() has been called. |
+ bool IsClosing() const { return !binding_.is_bound(); } |
+ |
+ void NotifyOwnerCloseIfAllDone() { |
+ DCHECK(IsClosing()); |
+ |
+ if (pending_send_count_ == 0) |
+ connection_->OnWebSocketClosed(); |
+ } |
+ |
+ HttpConnectionImpl* const connection_; |
+ |
+ Binding<WebSocket> binding_; |
+ WebSocketClientPtr client_; |
+ |
+ ScopedDataPipeConsumerHandle send_stream_; |
+ scoped_ptr<WebSocketReadQueue> read_send_stream_; |
+ size_t pending_send_count_; |
+ |
+ ScopedDataPipeProducerHandle receive_stream_; |
+ scoped_ptr<WebSocketWriteQueue> write_receive_stream_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); |
+}; |
+ |
template <> |
struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { |
static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { |
@@ -115,11 +255,11 @@ struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { |
}; |
HttpConnectionImpl::HttpConnectionImpl(int connection_id, |
- HttpServerImpl* owner, |
+ HttpServerImpl* server, |
HttpConnectionDelegatePtr delegate, |
HttpConnectionPtr* connection) |
: connection_id_(connection_id), |
- owner_(owner), |
+ server_(server), |
delegate_(delegate.Pass()), |
binding_(this, connection) { |
DCHECK(delegate_); |
@@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() { |
void HttpConnectionImpl::OnReceivedHttpRequest( |
const net::HttpServerRequestInfo& info) { |
- if (EncounteredConnectionError()) |
+ if (IsClosing()) |
return; |
delegate_->OnReceivedRequest( |
@@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest( |
void HttpConnectionImpl::OnReceivedWebSocketRequest( |
const net::HttpServerRequestInfo& info) { |
- // TODO(yzshen): implement it. |
+ if (IsClosing()) |
+ return; |
+ |
+ delegate_->OnReceivedWebSocketRequest( |
+ HttpRequest::From(info), |
+ [this, info](InterfaceRequest<WebSocket> web_socket, |
+ ScopedDataPipeConsumerHandle send_stream, |
+ WebSocketClientPtr web_socket_client) { |
+ if (!web_socket.is_pending() || !send_stream.is_valid() || |
+ !web_socket_client) { |
+ Close(); |
+ return; |
+ } |
+ |
+ web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(), |
+ send_stream.Pass(), |
+ web_socket_client.Pass())); |
+ server_->server()->AcceptWebSocket(connection_id_, info); |
+ }); |
} |
void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { |
- // TODO(yzshen): implement it. |
+ if (IsClosing()) |
+ return; |
+ |
+ web_socket_->OnReceivedWebSocketMessage(data); |
} |
void HttpConnectionImpl::SetSendBufferSize( |
@@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize( |
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) |
size = std::numeric_limits<int32_t>::max(); |
- owner_->server()->SetSendBufferSize( |
- connection_id_, static_cast<int32_t>(size)); |
+ server_->server()->SetSendBufferSize(connection_id_, |
+ static_cast<int32_t>(size)); |
callback.Run(MakeNetworkError(net::OK)); |
} |
@@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize( |
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) |
size = std::numeric_limits<int32_t>::max(); |
- owner_->server()->SetReceiveBufferSize( |
- connection_id_, static_cast<int32_t>(size)); |
+ server_->server()->SetReceiveBufferSize(connection_id_, |
+ static_cast<int32_t>(size)); |
callback.Run(MakeNetworkError(net::OK)); |
} |
@@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() { |
// |delegate_| has closed the pipe. Although it is set as error handler for |
// both |binding_| and |delegate_|, it will only be called at most once |
// because when called it closes/resets |binding_| and |delegate_|. |
- DCHECK(!EncounteredConnectionError()); |
- |
- binding_.Close(); |
- delegate_.reset(); |
- |
- // Don't close the connection until all pending responses are sent. |
- if (response_body_readers_.empty()) |
- owner_->server()->Close(connection_id_); |
+ Close(); |
} |
void HttpConnectionImpl::OnFinishedReadingResponseBody( |
@@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody( |
if (body) |
info.SetBody(*body, content_type); |
- owner_->server()->SendResponse(connection_id_, info); |
+ server_->server()->SendResponse(connection_id_, info); |
+ |
+ if (IsClosing()) |
+ NotifyOwnerCloseIfAllDone(); |
+} |
+ |
+void HttpConnectionImpl::Close() { |
+ DCHECK(!IsClosing()); |
+ |
+ binding_.Close(); |
+ delegate_.reset(); |
+ |
+ if (web_socket_) |
+ web_socket_->Close(); |
+ |
+ NotifyOwnerCloseIfAllDone(); |
+} |
+ |
+void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() { |
+ DCHECK(IsClosing()); |
- if (response_body_readers_.empty() && EncounteredConnectionError()) |
- owner_->server()->Close(connection_id_); |
+ // Don't close the connection until all pending sends are done. |
+ bool should_wait = !response_body_readers_.empty() || web_socket_; |
+ if (!should_wait) |
+ server_->server()->Close(connection_id_); |
+} |
+ |
+void HttpConnectionImpl::OnWebSocketClosed() { |
+ web_socket_.reset(); |
+ |
+ if (IsClosing()) { |
+ // The close operation is initiated by this object. |
+ NotifyOwnerCloseIfAllDone(); |
+ } else { |
+ // The close operation is initiated by |web_socket_|; start closing this |
+ // object. |
+ Close(); |
+ } |
} |
} // namespace mojo |