Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(218)

Unified Diff: mojo/services/network/http_connection_impl.cc

Issue 1144843002: Mojo service implementation for HTTP server - part 3 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase & resolve Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/services/network/http_connection_impl.h ('k') | mojo/services/network/http_server_apptest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/services/network/http_connection_impl.cc
diff --git a/mojo/services/network/http_connection_impl.cc b/mojo/services/network/http_connection_impl.cc
index 0831d1d7a888bad97dcfad5faaaa7553acb0cfd9..77d7a9165c61c3cc87ed79448d75993fa157dda5 100644
--- a/mojo/services/network/http_connection_impl.cc
+++ b/mojo/services/network/http_connection_impl.cc
@@ -14,6 +14,9 @@
#include "mojo/common/handle_watcher.h"
#include "mojo/services/network/http_server_impl.h"
#include "mojo/services/network/net_adapters.h"
+#include "mojo/services/network/public/cpp/web_socket_read_queue.h"
+#include "mojo/services/network/public/cpp/web_socket_write_queue.h"
+#include "mojo/services/network/public/interfaces/web_socket.mojom.h"
#include "net/base/net_errors.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_status_code.h"
@@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader {
DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
};
+class HttpConnectionImpl::WebSocketImpl : public WebSocket,
+ public ErrorHandler {
+ public:
+ // |connection| must outlive this object.
+ WebSocketImpl(HttpConnectionImpl* connection,
+ InterfaceRequest<WebSocket> request,
+ ScopedDataPipeConsumerHandle send_stream,
+ WebSocketClientPtr client)
+ : connection_(connection),
+ binding_(this, request.Pass()),
+ client_(client.Pass()),
+ send_stream_(send_stream.Pass()),
+ read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
+ pending_send_count_(0) {
+ DCHECK(binding_.is_bound());
+ DCHECK(client_);
+ DCHECK(send_stream_.is_valid());
+
+ binding_.set_error_handler(this);
+ client_.set_error_handler(this);
+
+ DataPipe data_pipe;
+ receive_stream_ = data_pipe.producer_handle.Pass();
+ write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
+
+ client_->DidConnect("", "", data_pipe.consumer_handle.Pass());
+ }
+
+ ~WebSocketImpl() override {}
+
+ void Close() {
+ DCHECK(!IsClosing());
+
+ binding_.Close();
+ client_.reset();
+
+ NotifyOwnerCloseIfAllDone();
+ }
+
+ void OnReceivedWebSocketMessage(const std::string& data) {
+ if (IsClosing())
+ return;
+
+ // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
+ // WebSocket{Read,Write}Queue doesn't handle that correctly.
+ if (data.empty())
+ return;
+
+ uint32_t size = static_cast<uint32_t>(data.size());
+ write_receive_stream_->Write(
+ &data[0], size,
+ base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
+ base::Unretained(this), size));
+ }
+
+ private:
+ // WebSocket implementation.
+ void Connect(const String& url,
+ Array<String> protocols,
+ const String& origin,
+ ScopedDataPipeConsumerHandle send_stream,
+ WebSocketClientPtr client) override {
+ NOTREACHED();
+ }
+
+ void Send(bool fin, MessageType type, uint32_t num_bytes) override {
+ if (!fin || type != MESSAGE_TYPE_TEXT) {
+ NOTIMPLEMENTED();
+ Close();
+ }
+
+ // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
+ // WebSocket{Read,Write}Queue doesn't handle that correctly.
+ if (num_bytes == 0)
+ return;
+
+ pending_send_count_++;
+ read_send_stream_->Read(
+ num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
+ base::Unretained(this), num_bytes));
+ }
+
+ void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
+
+ void Close(uint16_t code, const String& reason) override {
+ Close();
+ }
+
+ // ErrorHandler implementation.
+ void OnConnectionError() override { Close(); }
+
+ void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
+ DCHECK_GT(pending_send_count_, 0u);
+ pending_send_count_--;
+
+ if (data) {
+ connection_->server_->server()->SendOverWebSocket(
+ connection_->connection_id_, std::string(data, num_bytes));
+ }
+
+ if (IsClosing())
+ NotifyOwnerCloseIfAllDone();
+ }
+
+ void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
+ if (IsClosing())
+ return;
+
+ if (buffer)
+ client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes);
+ }
+
+ // Checks whether Close() has been called.
+ bool IsClosing() const { return !binding_.is_bound(); }
+
+ void NotifyOwnerCloseIfAllDone() {
+ DCHECK(IsClosing());
+
+ if (pending_send_count_ == 0)
+ connection_->OnWebSocketClosed();
+ }
+
+ HttpConnectionImpl* const connection_;
+
+ Binding<WebSocket> binding_;
+ WebSocketClientPtr client_;
+
+ ScopedDataPipeConsumerHandle send_stream_;
+ scoped_ptr<WebSocketReadQueue> read_send_stream_;
+ size_t pending_send_count_;
+
+ ScopedDataPipeProducerHandle receive_stream_;
+ scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
+
+ DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
+};
+
template <>
struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
@@ -115,11 +255,11 @@ struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
};
HttpConnectionImpl::HttpConnectionImpl(int connection_id,
- HttpServerImpl* owner,
+ HttpServerImpl* server,
HttpConnectionDelegatePtr delegate,
HttpConnectionPtr* connection)
: connection_id_(connection_id),
- owner_(owner),
+ server_(server),
delegate_(delegate.Pass()),
binding_(this, connection) {
DCHECK(delegate_);
@@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() {
void HttpConnectionImpl::OnReceivedHttpRequest(
const net::HttpServerRequestInfo& info) {
- if (EncounteredConnectionError())
+ if (IsClosing())
return;
delegate_->OnReceivedRequest(
@@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest(
void HttpConnectionImpl::OnReceivedWebSocketRequest(
const net::HttpServerRequestInfo& info) {
- // TODO(yzshen): implement it.
+ if (IsClosing())
+ return;
+
+ delegate_->OnReceivedWebSocketRequest(
+ HttpRequest::From(info),
+ [this, info](InterfaceRequest<WebSocket> web_socket,
+ ScopedDataPipeConsumerHandle send_stream,
+ WebSocketClientPtr web_socket_client) {
+ if (!web_socket.is_pending() || !send_stream.is_valid() ||
+ !web_socket_client) {
+ Close();
+ return;
+ }
+
+ web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(),
+ send_stream.Pass(),
+ web_socket_client.Pass()));
+ server_->server()->AcceptWebSocket(connection_id_, info);
+ });
}
void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
- // TODO(yzshen): implement it.
+ if (IsClosing())
+ return;
+
+ web_socket_->OnReceivedWebSocketMessage(data);
}
void HttpConnectionImpl::SetSendBufferSize(
@@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize(
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
size = std::numeric_limits<int32_t>::max();
- owner_->server()->SetSendBufferSize(
- connection_id_, static_cast<int32_t>(size));
+ server_->server()->SetSendBufferSize(connection_id_,
+ static_cast<int32_t>(size));
callback.Run(MakeNetworkError(net::OK));
}
@@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize(
if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
size = std::numeric_limits<int32_t>::max();
- owner_->server()->SetReceiveBufferSize(
- connection_id_, static_cast<int32_t>(size));
+ server_->server()->SetReceiveBufferSize(connection_id_,
+ static_cast<int32_t>(size));
callback.Run(MakeNetworkError(net::OK));
}
@@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() {
// |delegate_| has closed the pipe. Although it is set as error handler for
// both |binding_| and |delegate_|, it will only be called at most once
// because when called it closes/resets |binding_| and |delegate_|.
- DCHECK(!EncounteredConnectionError());
-
- binding_.Close();
- delegate_.reset();
-
- // Don't close the connection until all pending responses are sent.
- if (response_body_readers_.empty())
- owner_->server()->Close(connection_id_);
+ Close();
}
void HttpConnectionImpl::OnFinishedReadingResponseBody(
@@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody(
if (body)
info.SetBody(*body, content_type);
- owner_->server()->SendResponse(connection_id_, info);
+ server_->server()->SendResponse(connection_id_, info);
+
+ if (IsClosing())
+ NotifyOwnerCloseIfAllDone();
+}
+
+void HttpConnectionImpl::Close() {
+ DCHECK(!IsClosing());
+
+ binding_.Close();
+ delegate_.reset();
+
+ if (web_socket_)
+ web_socket_->Close();
+
+ NotifyOwnerCloseIfAllDone();
+}
+
+void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
+ DCHECK(IsClosing());
- if (response_body_readers_.empty() && EncounteredConnectionError())
- owner_->server()->Close(connection_id_);
+ // Don't close the connection until all pending sends are done.
+ bool should_wait = !response_body_readers_.empty() || web_socket_;
+ if (!should_wait)
+ server_->server()->Close(connection_id_);
+}
+
+void HttpConnectionImpl::OnWebSocketClosed() {
+ web_socket_.reset();
+
+ if (IsClosing()) {
+ // The close operation is initiated by this object.
+ NotifyOwnerCloseIfAllDone();
+ } else {
+ // The close operation is initiated by |web_socket_|; start closing this
+ // object.
+ Close();
+ }
}
} // namespace mojo
« no previous file with comments | « mojo/services/network/http_connection_impl.h ('k') | mojo/services/network/http_server_apptest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698