| Index: mojo/services/network/http_connection_impl.cc
|
| diff --git a/mojo/services/network/http_connection_impl.cc b/mojo/services/network/http_connection_impl.cc
|
| index 0831d1d7a888bad97dcfad5faaaa7553acb0cfd9..77d7a9165c61c3cc87ed79448d75993fa157dda5 100644
|
| --- a/mojo/services/network/http_connection_impl.cc
|
| +++ b/mojo/services/network/http_connection_impl.cc
|
| @@ -14,6 +14,9 @@
|
| #include "mojo/common/handle_watcher.h"
|
| #include "mojo/services/network/http_server_impl.h"
|
| #include "mojo/services/network/net_adapters.h"
|
| +#include "mojo/services/network/public/cpp/web_socket_read_queue.h"
|
| +#include "mojo/services/network/public/cpp/web_socket_write_queue.h"
|
| +#include "mojo/services/network/public/interfaces/web_socket.mojom.h"
|
| #include "net/base/net_errors.h"
|
| #include "net/http/http_request_headers.h"
|
| #include "net/http/http_status_code.h"
|
| @@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader {
|
| DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
|
| };
|
|
|
| +class HttpConnectionImpl::WebSocketImpl : public WebSocket,
|
| + public ErrorHandler {
|
| + public:
|
| + // |connection| must outlive this object.
|
| + WebSocketImpl(HttpConnectionImpl* connection,
|
| + InterfaceRequest<WebSocket> request,
|
| + ScopedDataPipeConsumerHandle send_stream,
|
| + WebSocketClientPtr client)
|
| + : connection_(connection),
|
| + binding_(this, request.Pass()),
|
| + client_(client.Pass()),
|
| + send_stream_(send_stream.Pass()),
|
| + read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
|
| + pending_send_count_(0) {
|
| + DCHECK(binding_.is_bound());
|
| + DCHECK(client_);
|
| + DCHECK(send_stream_.is_valid());
|
| +
|
| + binding_.set_error_handler(this);
|
| + client_.set_error_handler(this);
|
| +
|
| + DataPipe data_pipe;
|
| + receive_stream_ = data_pipe.producer_handle.Pass();
|
| + write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
|
| +
|
| + client_->DidConnect("", "", data_pipe.consumer_handle.Pass());
|
| + }
|
| +
|
| + ~WebSocketImpl() override {}
|
| +
|
| + void Close() {
|
| + DCHECK(!IsClosing());
|
| +
|
| + binding_.Close();
|
| + client_.reset();
|
| +
|
| + NotifyOwnerCloseIfAllDone();
|
| + }
|
| +
|
| + void OnReceivedWebSocketMessage(const std::string& data) {
|
| + if (IsClosing())
|
| + return;
|
| +
|
| + // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
|
| + // WebSocket{Read,Write}Queue doesn't handle that correctly.
|
| + if (data.empty())
|
| + return;
|
| +
|
| + uint32_t size = static_cast<uint32_t>(data.size());
|
| + write_receive_stream_->Write(
|
| + &data[0], size,
|
| + base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
|
| + base::Unretained(this), size));
|
| + }
|
| +
|
| + private:
|
| + // WebSocket implementation.
|
| + void Connect(const String& url,
|
| + Array<String> protocols,
|
| + const String& origin,
|
| + ScopedDataPipeConsumerHandle send_stream,
|
| + WebSocketClientPtr client) override {
|
| + NOTREACHED();
|
| + }
|
| +
|
| + void Send(bool fin, MessageType type, uint32_t num_bytes) override {
|
| + if (!fin || type != MESSAGE_TYPE_TEXT) {
|
| + NOTIMPLEMENTED();
|
| + Close();
|
| + }
|
| +
|
| + // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
|
| + // WebSocket{Read,Write}Queue doesn't handle that correctly.
|
| + if (num_bytes == 0)
|
| + return;
|
| +
|
| + pending_send_count_++;
|
| + read_send_stream_->Read(
|
| + num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
|
| + base::Unretained(this), num_bytes));
|
| + }
|
| +
|
| + void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
|
| +
|
| + void Close(uint16_t code, const String& reason) override {
|
| + Close();
|
| + }
|
| +
|
| + // ErrorHandler implementation.
|
| + void OnConnectionError() override { Close(); }
|
| +
|
| + void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
|
| + DCHECK_GT(pending_send_count_, 0u);
|
| + pending_send_count_--;
|
| +
|
| + if (data) {
|
| + connection_->server_->server()->SendOverWebSocket(
|
| + connection_->connection_id_, std::string(data, num_bytes));
|
| + }
|
| +
|
| + if (IsClosing())
|
| + NotifyOwnerCloseIfAllDone();
|
| + }
|
| +
|
| + void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
|
| + if (IsClosing())
|
| + return;
|
| +
|
| + if (buffer)
|
| + client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes);
|
| + }
|
| +
|
| + // Checks whether Close() has been called.
|
| + bool IsClosing() const { return !binding_.is_bound(); }
|
| +
|
| + void NotifyOwnerCloseIfAllDone() {
|
| + DCHECK(IsClosing());
|
| +
|
| + if (pending_send_count_ == 0)
|
| + connection_->OnWebSocketClosed();
|
| + }
|
| +
|
| + HttpConnectionImpl* const connection_;
|
| +
|
| + Binding<WebSocket> binding_;
|
| + WebSocketClientPtr client_;
|
| +
|
| + ScopedDataPipeConsumerHandle send_stream_;
|
| + scoped_ptr<WebSocketReadQueue> read_send_stream_;
|
| + size_t pending_send_count_;
|
| +
|
| + ScopedDataPipeProducerHandle receive_stream_;
|
| + scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
|
| +};
|
| +
|
| template <>
|
| struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
|
| static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
|
| @@ -115,11 +255,11 @@ struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
|
| };
|
|
|
| HttpConnectionImpl::HttpConnectionImpl(int connection_id,
|
| - HttpServerImpl* owner,
|
| + HttpServerImpl* server,
|
| HttpConnectionDelegatePtr delegate,
|
| HttpConnectionPtr* connection)
|
| : connection_id_(connection_id),
|
| - owner_(owner),
|
| + server_(server),
|
| delegate_(delegate.Pass()),
|
| binding_(this, connection) {
|
| DCHECK(delegate_);
|
| @@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() {
|
|
|
| void HttpConnectionImpl::OnReceivedHttpRequest(
|
| const net::HttpServerRequestInfo& info) {
|
| - if (EncounteredConnectionError())
|
| + if (IsClosing())
|
| return;
|
|
|
| delegate_->OnReceivedRequest(
|
| @@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest(
|
|
|
| void HttpConnectionImpl::OnReceivedWebSocketRequest(
|
| const net::HttpServerRequestInfo& info) {
|
| - // TODO(yzshen): implement it.
|
| + if (IsClosing())
|
| + return;
|
| +
|
| + delegate_->OnReceivedWebSocketRequest(
|
| + HttpRequest::From(info),
|
| + [this, info](InterfaceRequest<WebSocket> web_socket,
|
| + ScopedDataPipeConsumerHandle send_stream,
|
| + WebSocketClientPtr web_socket_client) {
|
| + if (!web_socket.is_pending() || !send_stream.is_valid() ||
|
| + !web_socket_client) {
|
| + Close();
|
| + return;
|
| + }
|
| +
|
| + web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(),
|
| + send_stream.Pass(),
|
| + web_socket_client.Pass()));
|
| + server_->server()->AcceptWebSocket(connection_id_, info);
|
| + });
|
| }
|
|
|
| void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
|
| - // TODO(yzshen): implement it.
|
| + if (IsClosing())
|
| + return;
|
| +
|
| + web_socket_->OnReceivedWebSocketMessage(data);
|
| }
|
|
|
| void HttpConnectionImpl::SetSendBufferSize(
|
| @@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize(
|
| if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
|
| size = std::numeric_limits<int32_t>::max();
|
|
|
| - owner_->server()->SetSendBufferSize(
|
| - connection_id_, static_cast<int32_t>(size));
|
| + server_->server()->SetSendBufferSize(connection_id_,
|
| + static_cast<int32_t>(size));
|
| callback.Run(MakeNetworkError(net::OK));
|
| }
|
|
|
| @@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize(
|
| if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
|
| size = std::numeric_limits<int32_t>::max();
|
|
|
| - owner_->server()->SetReceiveBufferSize(
|
| - connection_id_, static_cast<int32_t>(size));
|
| + server_->server()->SetReceiveBufferSize(connection_id_,
|
| + static_cast<int32_t>(size));
|
| callback.Run(MakeNetworkError(net::OK));
|
| }
|
|
|
| @@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() {
|
| // |delegate_| has closed the pipe. Although it is set as error handler for
|
| // both |binding_| and |delegate_|, it will only be called at most once
|
| // because when called it closes/resets |binding_| and |delegate_|.
|
| - DCHECK(!EncounteredConnectionError());
|
| -
|
| - binding_.Close();
|
| - delegate_.reset();
|
| -
|
| - // Don't close the connection until all pending responses are sent.
|
| - if (response_body_readers_.empty())
|
| - owner_->server()->Close(connection_id_);
|
| + Close();
|
| }
|
|
|
| void HttpConnectionImpl::OnFinishedReadingResponseBody(
|
| @@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody(
|
| if (body)
|
| info.SetBody(*body, content_type);
|
|
|
| - owner_->server()->SendResponse(connection_id_, info);
|
| + server_->server()->SendResponse(connection_id_, info);
|
| +
|
| + if (IsClosing())
|
| + NotifyOwnerCloseIfAllDone();
|
| +}
|
| +
|
| +void HttpConnectionImpl::Close() {
|
| + DCHECK(!IsClosing());
|
| +
|
| + binding_.Close();
|
| + delegate_.reset();
|
| +
|
| + if (web_socket_)
|
| + web_socket_->Close();
|
| +
|
| + NotifyOwnerCloseIfAllDone();
|
| +}
|
| +
|
| +void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
|
| + DCHECK(IsClosing());
|
|
|
| - if (response_body_readers_.empty() && EncounteredConnectionError())
|
| - owner_->server()->Close(connection_id_);
|
| + // Don't close the connection until all pending sends are done.
|
| + bool should_wait = !response_body_readers_.empty() || web_socket_;
|
| + if (!should_wait)
|
| + server_->server()->Close(connection_id_);
|
| +}
|
| +
|
| +void HttpConnectionImpl::OnWebSocketClosed() {
|
| + web_socket_.reset();
|
| +
|
| + if (IsClosing()) {
|
| + // The close operation is initiated by this object.
|
| + NotifyOwnerCloseIfAllDone();
|
| + } else {
|
| + // The close operation is initiated by |web_socket_|; start closing this
|
| + // object.
|
| + Close();
|
| + }
|
| }
|
|
|
| } // namespace mojo
|
|
|