Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(264)

Side by Side Diff: mojo/services/network/http_connection_impl.cc

Issue 1144843002: Mojo service implementation for HTTP server - part 3 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase & resolve Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/services/network/http_connection_impl.h" 5 #include "mojo/services/network/http_connection_impl.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind_helpers.h" 9 #include "base/bind_helpers.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/stl_util.h" 12 #include "base/stl_util.h"
13 #include "base/strings/string_util.h" 13 #include "base/strings/string_util.h"
14 #include "mojo/common/handle_watcher.h" 14 #include "mojo/common/handle_watcher.h"
15 #include "mojo/services/network/http_server_impl.h" 15 #include "mojo/services/network/http_server_impl.h"
16 #include "mojo/services/network/net_adapters.h" 16 #include "mojo/services/network/net_adapters.h"
17 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
18 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
19 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
17 #include "net/base/net_errors.h" 20 #include "net/base/net_errors.h"
18 #include "net/http/http_request_headers.h" 21 #include "net/http/http_request_headers.h"
19 #include "net/http/http_status_code.h" 22 #include "net/http/http_status_code.h"
20 #include "net/server/http_server.h" 23 #include "net/server/http_server.h"
21 #include "net/server/http_server_request_info.h" 24 #include "net/server/http_server_request_info.h"
22 #include "net/server/http_server_response_info.h" 25 #include "net/server/http_server_response_info.h"
23 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h" 26 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h"
24 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" 27 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
25 28
26 namespace mojo { 29 namespace mojo {
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
75 void OnHandleReady(MojoResult result) { ReadMore(); } 78 void OnHandleReady(MojoResult result) { ReadMore(); }
76 79
77 ScopedDataPipeConsumerHandle consumer_; 80 ScopedDataPipeConsumerHandle consumer_;
78 common::HandleWatcher watcher_; 81 common::HandleWatcher watcher_;
79 CompletionCallback completion_callback_; 82 CompletionCallback completion_callback_;
80 scoped_ptr<std::string> buffer_; 83 scoped_ptr<std::string> buffer_;
81 84
82 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); 85 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
83 }; 86 };
84 87
88 class HttpConnectionImpl::WebSocketImpl : public WebSocket,
89 public ErrorHandler {
90 public:
91 // |connection| must outlive this object.
92 WebSocketImpl(HttpConnectionImpl* connection,
93 InterfaceRequest<WebSocket> request,
94 ScopedDataPipeConsumerHandle send_stream,
95 WebSocketClientPtr client)
96 : connection_(connection),
97 binding_(this, request.Pass()),
98 client_(client.Pass()),
99 send_stream_(send_stream.Pass()),
100 read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
101 pending_send_count_(0) {
102 DCHECK(binding_.is_bound());
103 DCHECK(client_);
104 DCHECK(send_stream_.is_valid());
105
106 binding_.set_error_handler(this);
107 client_.set_error_handler(this);
108
109 DataPipe data_pipe;
110 receive_stream_ = data_pipe.producer_handle.Pass();
111 write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
112
113 client_->DidConnect("", "", data_pipe.consumer_handle.Pass());
114 }
115
116 ~WebSocketImpl() override {}
117
118 void Close() {
119 DCHECK(!IsClosing());
120
121 binding_.Close();
122 client_.reset();
123
124 NotifyOwnerCloseIfAllDone();
125 }
126
127 void OnReceivedWebSocketMessage(const std::string& data) {
128 if (IsClosing())
129 return;
130
131 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
132 // WebSocket{Read,Write}Queue doesn't handle that correctly.
133 if (data.empty())
134 return;
135
136 uint32_t size = static_cast<uint32_t>(data.size());
137 write_receive_stream_->Write(
138 &data[0], size,
139 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
140 base::Unretained(this), size));
141 }
142
143 private:
144 // WebSocket implementation.
145 void Connect(const String& url,
146 Array<String> protocols,
147 const String& origin,
148 ScopedDataPipeConsumerHandle send_stream,
149 WebSocketClientPtr client) override {
150 NOTREACHED();
151 }
152
153 void Send(bool fin, MessageType type, uint32_t num_bytes) override {
154 if (!fin || type != MESSAGE_TYPE_TEXT) {
155 NOTIMPLEMENTED();
156 Close();
157 }
158
159 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
160 // WebSocket{Read,Write}Queue doesn't handle that correctly.
161 if (num_bytes == 0)
162 return;
163
164 pending_send_count_++;
165 read_send_stream_->Read(
166 num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
167 base::Unretained(this), num_bytes));
168 }
169
170 void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
171
172 void Close(uint16_t code, const String& reason) override {
173 Close();
174 }
175
176 // ErrorHandler implementation.
177 void OnConnectionError() override { Close(); }
178
179 void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
180 DCHECK_GT(pending_send_count_, 0u);
181 pending_send_count_--;
182
183 if (data) {
184 connection_->server_->server()->SendOverWebSocket(
185 connection_->connection_id_, std::string(data, num_bytes));
186 }
187
188 if (IsClosing())
189 NotifyOwnerCloseIfAllDone();
190 }
191
192 void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
193 if (IsClosing())
194 return;
195
196 if (buffer)
197 client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes);
198 }
199
200 // Checks whether Close() has been called.
201 bool IsClosing() const { return !binding_.is_bound(); }
202
203 void NotifyOwnerCloseIfAllDone() {
204 DCHECK(IsClosing());
205
206 if (pending_send_count_ == 0)
207 connection_->OnWebSocketClosed();
208 }
209
210 HttpConnectionImpl* const connection_;
211
212 Binding<WebSocket> binding_;
213 WebSocketClientPtr client_;
214
215 ScopedDataPipeConsumerHandle send_stream_;
216 scoped_ptr<WebSocketReadQueue> read_send_stream_;
217 size_t pending_send_count_;
218
219 ScopedDataPipeProducerHandle receive_stream_;
220 scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
221
222 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
223 };
224
85 template <> 225 template <>
86 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { 226 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
87 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { 227 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
88 HttpRequestPtr request(HttpRequest::New()); 228 HttpRequestPtr request(HttpRequest::New());
89 request->method = obj.method; 229 request->method = obj.method;
90 request->url = obj.path; 230 request->url = obj.path;
91 request->headers.resize(obj.headers.size()); 231 request->headers.resize(obj.headers.size());
92 size_t index = 0; 232 size_t index = 0;
93 for (const auto& item : obj.headers) { 233 for (const auto& item : obj.headers) {
94 HttpHeaderPtr header(HttpHeader::New()); 234 HttpHeaderPtr header(HttpHeader::New());
(...skipping 13 matching lines...) Expand all
108 MojoResult result = 248 MojoResult result =
109 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(), 249 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(),
110 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); 250 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
111 CHECK_EQ(MOJO_RESULT_OK, result); 251 CHECK_EQ(MOJO_RESULT_OK, result);
112 } 252 }
113 return request.Pass(); 253 return request.Pass();
114 } 254 }
115 }; 255 };
116 256
117 HttpConnectionImpl::HttpConnectionImpl(int connection_id, 257 HttpConnectionImpl::HttpConnectionImpl(int connection_id,
118 HttpServerImpl* owner, 258 HttpServerImpl* server,
119 HttpConnectionDelegatePtr delegate, 259 HttpConnectionDelegatePtr delegate,
120 HttpConnectionPtr* connection) 260 HttpConnectionPtr* connection)
121 : connection_id_(connection_id), 261 : connection_id_(connection_id),
122 owner_(owner), 262 server_(server),
123 delegate_(delegate.Pass()), 263 delegate_(delegate.Pass()),
124 binding_(this, connection) { 264 binding_(this, connection) {
125 DCHECK(delegate_); 265 DCHECK(delegate_);
126 binding_.set_error_handler(this); 266 binding_.set_error_handler(this);
127 delegate_.set_error_handler(this); 267 delegate_.set_error_handler(this);
128 } 268 }
129 269
130 HttpConnectionImpl::~HttpConnectionImpl() { 270 HttpConnectionImpl::~HttpConnectionImpl() {
131 STLDeleteElements(&response_body_readers_); 271 STLDeleteElements(&response_body_readers_);
132 } 272 }
133 273
134 void HttpConnectionImpl::OnReceivedHttpRequest( 274 void HttpConnectionImpl::OnReceivedHttpRequest(
135 const net::HttpServerRequestInfo& info) { 275 const net::HttpServerRequestInfo& info) {
136 if (EncounteredConnectionError()) 276 if (IsClosing())
137 return; 277 return;
138 278
139 delegate_->OnReceivedRequest( 279 delegate_->OnReceivedRequest(
140 HttpRequest::From(info), [this](HttpResponsePtr response) { 280 HttpRequest::From(info), [this](HttpResponsePtr response) {
141 if (response->body.is_valid()) { 281 if (response->body.is_valid()) {
142 SimpleDataPipeReader* reader = new SimpleDataPipeReader; 282 SimpleDataPipeReader* reader = new SimpleDataPipeReader;
143 response_body_readers_.insert(reader); 283 response_body_readers_.insert(reader);
144 ScopedDataPipeConsumerHandle body = response->body.Pass(); 284 ScopedDataPipeConsumerHandle body = response->body.Pass();
145 reader->Start( 285 reader->Start(
146 body.Pass(), 286 body.Pass(),
147 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody, 287 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody,
148 base::Unretained(this), base::Passed(&response))); 288 base::Unretained(this), base::Passed(&response)));
149 } else { 289 } else {
150 OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr); 290 OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr);
151 } 291 }
152 }); 292 });
153 } 293 }
154 294
155 void HttpConnectionImpl::OnReceivedWebSocketRequest( 295 void HttpConnectionImpl::OnReceivedWebSocketRequest(
156 const net::HttpServerRequestInfo& info) { 296 const net::HttpServerRequestInfo& info) {
157 // TODO(yzshen): implement it. 297 if (IsClosing())
298 return;
299
300 delegate_->OnReceivedWebSocketRequest(
301 HttpRequest::From(info),
302 [this, info](InterfaceRequest<WebSocket> web_socket,
303 ScopedDataPipeConsumerHandle send_stream,
304 WebSocketClientPtr web_socket_client) {
305 if (!web_socket.is_pending() || !send_stream.is_valid() ||
306 !web_socket_client) {
307 Close();
308 return;
309 }
310
311 web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(),
312 send_stream.Pass(),
313 web_socket_client.Pass()));
314 server_->server()->AcceptWebSocket(connection_id_, info);
315 });
158 } 316 }
159 317
160 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { 318 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
161 // TODO(yzshen): implement it. 319 if (IsClosing())
320 return;
321
322 web_socket_->OnReceivedWebSocketMessage(data);
162 } 323 }
163 324
164 void HttpConnectionImpl::SetSendBufferSize( 325 void HttpConnectionImpl::SetSendBufferSize(
165 uint32_t size, 326 uint32_t size,
166 const SetSendBufferSizeCallback& callback) { 327 const SetSendBufferSizeCallback& callback) {
167 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) 328 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
168 size = std::numeric_limits<int32_t>::max(); 329 size = std::numeric_limits<int32_t>::max();
169 330
170 owner_->server()->SetSendBufferSize( 331 server_->server()->SetSendBufferSize(connection_id_,
171 connection_id_, static_cast<int32_t>(size)); 332 static_cast<int32_t>(size));
172 callback.Run(MakeNetworkError(net::OK)); 333 callback.Run(MakeNetworkError(net::OK));
173 } 334 }
174 335
175 void HttpConnectionImpl::SetReceiveBufferSize( 336 void HttpConnectionImpl::SetReceiveBufferSize(
176 uint32_t size, 337 uint32_t size,
177 const SetReceiveBufferSizeCallback& callback) { 338 const SetReceiveBufferSizeCallback& callback) {
178 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) 339 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
179 size = std::numeric_limits<int32_t>::max(); 340 size = std::numeric_limits<int32_t>::max();
180 341
181 owner_->server()->SetReceiveBufferSize( 342 server_->server()->SetReceiveBufferSize(connection_id_,
182 connection_id_, static_cast<int32_t>(size)); 343 static_cast<int32_t>(size));
183 callback.Run(MakeNetworkError(net::OK)); 344 callback.Run(MakeNetworkError(net::OK));
184 } 345 }
185 346
186 void HttpConnectionImpl::OnConnectionError() { 347 void HttpConnectionImpl::OnConnectionError() {
187 // This method is called when the proxy side of |binding_| or the impl side of 348 // This method is called when the proxy side of |binding_| or the impl side of
188 // |delegate_| has closed the pipe. Although it is set as error handler for 349 // |delegate_| has closed the pipe. Although it is set as error handler for
189 // both |binding_| and |delegate_|, it will only be called at most once 350 // both |binding_| and |delegate_|, it will only be called at most once
190 // because when called it closes/resets |binding_| and |delegate_|. 351 // because when called it closes/resets |binding_| and |delegate_|.
191 DCHECK(!EncounteredConnectionError()); 352 Close();
192
193 binding_.Close();
194 delegate_.reset();
195
196 // Don't close the connection until all pending responses are sent.
197 if (response_body_readers_.empty())
198 owner_->server()->Close(connection_id_);
199 } 353 }
200 354
201 void HttpConnectionImpl::OnFinishedReadingResponseBody( 355 void HttpConnectionImpl::OnFinishedReadingResponseBody(
202 HttpResponsePtr response, 356 HttpResponsePtr response,
203 SimpleDataPipeReader* reader, 357 SimpleDataPipeReader* reader,
204 scoped_ptr<std::string> body) { 358 scoped_ptr<std::string> body) {
205 if (reader) { 359 if (reader) {
206 delete reader; 360 delete reader;
207 response_body_readers_.erase(reader); 361 response_body_readers_.erase(reader);
208 } 362 }
(...skipping 19 matching lines...) Expand all
228 content_type = header.value; 382 content_type = header.value;
229 continue; 383 continue;
230 } 384 }
231 } 385 }
232 info.AddHeader(header.name, header.value); 386 info.AddHeader(header.name, header.value);
233 } 387 }
234 388
235 if (body) 389 if (body)
236 info.SetBody(*body, content_type); 390 info.SetBody(*body, content_type);
237 391
238 owner_->server()->SendResponse(connection_id_, info); 392 server_->server()->SendResponse(connection_id_, info);
239 393
240 if (response_body_readers_.empty() && EncounteredConnectionError()) 394 if (IsClosing())
241 owner_->server()->Close(connection_id_); 395 NotifyOwnerCloseIfAllDone();
396 }
397
398 void HttpConnectionImpl::Close() {
399 DCHECK(!IsClosing());
400
401 binding_.Close();
402 delegate_.reset();
403
404 if (web_socket_)
405 web_socket_->Close();
406
407 NotifyOwnerCloseIfAllDone();
408 }
409
410 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
411 DCHECK(IsClosing());
412
413 // Don't close the connection until all pending sends are done.
414 bool should_wait = !response_body_readers_.empty() || web_socket_;
415 if (!should_wait)
416 server_->server()->Close(connection_id_);
417 }
418
419 void HttpConnectionImpl::OnWebSocketClosed() {
420 web_socket_.reset();
421
422 if (IsClosing()) {
423 // The close operation is initiated by this object.
424 NotifyOwnerCloseIfAllDone();
425 } else {
426 // The close operation is initiated by |web_socket_|; start closing this
427 // object.
428 Close();
429 }
242 } 430 }
243 431
244 } // namespace mojo 432 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/http_connection_impl.h ('k') | mojo/services/network/http_server_apptest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698