Index: mojo/services/network/http_connection_impl.cc |
diff --git a/mojo/services/network/http_connection_impl.cc b/mojo/services/network/http_connection_impl.cc |
index 47449ecd557894cf67ea108407706f7a8e78990f..24fc7c5de90c326d88650910fba7aa99854bb8ae 100644 |
--- a/mojo/services/network/http_connection_impl.cc |
+++ b/mojo/services/network/http_connection_impl.cc |
@@ -6,14 +6,113 @@ |
#include <limits> |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "base/logging.h" |
+#include "base/strings/string_util.h" |
+#include "mojo/common/handle_watcher.h" |
#include "mojo/services/network/http_server_impl.h" |
#include "mojo/services/network/net_adapters.h" |
#include "net/base/net_errors.h" |
+#include "net/http/http_request_headers.h" |
+#include "net/http/http_status_code.h" |
#include "net/server/http_server.h" |
#include "net/server/http_server_request_info.h" |
+#include "net/server/http_server_response_info.h" |
+#include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h" |
+#include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" |
namespace mojo { |
+// SimpleDataPipeReader reads till end-of-file, stores the data in a string and |
+// notifies completion. |
+class HttpConnectionImpl::SimpleDataPipeReader { |
+ public: |
+ using CompletionCallback = |
+ base::Callback<void(SimpleDataPipeReader*, scoped_ptr<std::string>)>; |
+ |
+ SimpleDataPipeReader() {} |
+ ~SimpleDataPipeReader() {} |
+ |
+ void Start(ScopedDataPipeConsumerHandle consumer, |
+ const CompletionCallback& completion_callback) { |
+ DCHECK(consumer.is_valid() && !consumer_.is_valid()); |
+ consumer_ = consumer.Pass(); |
+ completion_callback_ = completion_callback; |
+ buffer_.reset(new std::string); |
+ ReadMore(); |
+ } |
+ |
+ private: |
+ void ReadMore() { |
+ const void* buf; |
+ uint32_t buf_size; |
+ MojoResult rv = BeginReadDataRaw(consumer_.get(), &buf, &buf_size, |
+ MOJO_READ_DATA_FLAG_NONE); |
+ if (rv == MOJO_RESULT_OK) { |
+ buffer_->append(static_cast<const char*>(buf), buf_size); |
+ EndReadDataRaw(consumer_.get(), buf_size); |
+ WaitToReadMore(); |
+ } else if (rv == MOJO_RESULT_SHOULD_WAIT) { |
+ WaitToReadMore(); |
+ } else if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
+ // We reached end-of-file. |
+ completion_callback_.Run(this, buffer_.Pass()); |
+ // Note: This object may have been destroyed in the callback. |
+ } else { |
+ CHECK(false); |
+ } |
+ } |
+ |
+ void WaitToReadMore() { |
+ watcher_.Start(consumer_.get(), MOJO_HANDLE_SIGNAL_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&SimpleDataPipeReader::OnHandleReady, |
+ base::Unretained(this))); |
+ } |
+ |
+ void OnHandleReady(MojoResult result) { ReadMore(); } |
+ |
+ ScopedDataPipeConsumerHandle consumer_; |
+ common::HandleWatcher watcher_; |
+ CompletionCallback completion_callback_; |
+ scoped_ptr<std::string> buffer_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); |
+}; |
+ |
+template <> |
+struct TypeConverter<URLRequestPtr, net::HttpServerRequestInfo> { |
+ static URLRequestPtr Convert(const net::HttpServerRequestInfo& obj) { |
+ URLRequestPtr request(URLRequest::New()); |
+ request->url = obj.path; |
+ request->method = obj.method; |
+ request->headers.resize(obj.headers.size()); |
+ size_t index = 0; |
+ for (const auto& item : obj.headers) { |
+ HTTPHeaderPtr header(HTTPHeader::New()); |
+ header->name = item.first; |
+ header->value = item.second; |
+ request->headers[index++] = header.Pass(); |
+ } |
+ if (!obj.data.empty()) { |
+ uint32_t num_bytes = static_cast<uint32_t>(obj.data.size()); |
+ MojoCreateDataPipeOptions options; |
+ options.struct_size = sizeof(MojoCreateDataPipeOptions); |
+ options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; |
+ options.element_num_bytes = 1; |
+ options.capacity_num_bytes = num_bytes; |
+ DataPipe data_pipe(options); |
+ request->body.push_back(data_pipe.consumer_handle.Pass()); |
+ MojoResult result = |
+ WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(), |
+ &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); |
+ DCHECK_EQ(MOJO_RESULT_OK, result); |
jam
2015/05/15 06:46:43
nit: perhaps CHECK so that we know if we have to h
yzshen1
2015/05/15 17:22:54
Done.
|
+ } |
+ return request.Pass(); |
+ } |
+}; |
+ |
HttpConnectionImpl::HttpConnectionImpl(int connection_id, |
HttpServerImpl* owner, |
HttpConnectionDelegatePtr delegate, |
@@ -21,16 +120,36 @@ HttpConnectionImpl::HttpConnectionImpl(int connection_id, |
: connection_id_(connection_id), |
owner_(owner), |
delegate_(delegate.Pass()), |
- binding_(this, connection) { |
+ binding_(this, connection), |
+ encountered_connection_error_(false) { |
+ |
binding_.set_error_handler(this); |
delegate_.set_error_handler(this); |
} |
-HttpConnectionImpl::~HttpConnectionImpl() {} |
+HttpConnectionImpl::~HttpConnectionImpl() { |
+ for (const auto& reader : response_body_readers_) |
jam
2015/05/15 06:46:43
nit: STLDeleteElements
yzshen1
2015/05/15 17:22:54
Done.
|
+ delete reader; |
+} |
void HttpConnectionImpl::OnReceivedHttpRequest( |
const net::HttpServerRequestInfo& info) { |
- // TODO(yzshen): implement it. |
+ if (!delegate_) |
+ return; |
+ |
+ delegate_->OnReceivedRequest( |
+ URLRequest::From(info), [this](URLResponsePtr response) { |
+ if (response->body.is_valid()) { |
+ SimpleDataPipeReader* reader = new SimpleDataPipeReader; |
+ response_body_readers_.insert(reader); |
+ reader->Start( |
+ response->body.Pass(), |
+ base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody, |
+ base::Unretained(this), base::Passed(&response))); |
+ } else { |
+ OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr); |
+ } |
+ }); |
} |
void HttpConnectionImpl::OnReceivedWebSocketRequest( |
@@ -65,9 +184,60 @@ void HttpConnectionImpl::SetReceiveBufferSize( |
} |
void HttpConnectionImpl::OnConnectionError() { |
- // The proxy side of |binding_| or the impl side of |delegate_| has closed the |
- // pipe. The connection is not needed anymore. |
- owner_->server()->Close(connection_id_); |
+ if (!encountered_connection_error_) { |
jam
2015/05/15 06:46:43
why is this boolean needed? perhaps document why i
yzshen1
2015/05/15 17:22:54
I realized that based on the current code it won't
|
+ // The proxy side of |binding_| or the impl side of |delegate_| has closed |
+ // the pipe. |
+ encountered_connection_error_ = true; |
+ binding_.Close(); |
+ delegate_.reset(); |
+ |
+ // Don't close the connection until all pending responses are sent. |
+ if (response_body_readers_.empty()) |
+ owner_->server()->Close(connection_id_); |
+ } |
+} |
+ |
+void HttpConnectionImpl::OnFinishedReadingResponseBody( |
+ URLResponsePtr response, |
+ SimpleDataPipeReader* reader, |
+ scoped_ptr<std::string> body) { |
+ if (reader) { |
+ delete reader; |
+ response_body_readers_.erase(reader); |
+ } |
+ |
+ net::HttpServerResponseInfo info( |
+ static_cast<net::HttpStatusCode>(response->status_code)); |
+ |
+ std::string content_type; |
+ for (size_t i = 0; i < response->headers.size(); ++i) { |
+ const HTTPHeader& header = *(response->headers[i]); |
+ |
+ if (body) { |
+ // net::HttpServerResponseInfo::SetBody() automatically sets |
+ // Content-Length and Content-Types, so skip the two here. |
+ // |
+ // TODO(yzshen): Consider adding to net::HttpServerResponseInfo a simple |
+ // setter for body which doesn't fiddle with headers. |
+ if (base::strcasecmp(header.name.data(), |
+ net::HttpRequestHeaders::kContentLength) == 0) { |
+ continue; |
+ } else if (base::strcasecmp(header.name.data(), |
+ net::HttpRequestHeaders::kContentType) == 0) { |
+ content_type = header.value; |
+ continue; |
+ } |
+ } |
+ info.AddHeader(header.name, header.value); |
+ } |
+ |
+ if (body) |
+ info.SetBody(*body, content_type); |
+ |
+ owner_->server()->SendResponse(connection_id_, info); |
+ |
+ if (response_body_readers_.empty() && encountered_connection_error_) |
+ owner_->server()->Close(connection_id_); |
} |
} // namespace mojo |