OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/services/network/http_connection_impl.h" | 5 #include "mojo/services/network/http_connection_impl.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/bind_helpers.h" | 9 #include "base/bind_helpers.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/stl_util.h" | 12 #include "base/stl_util.h" |
13 #include "base/strings/string_util.h" | 13 #include "base/strings/string_util.h" |
14 #include "mojo/common/handle_watcher.h" | 14 #include "mojo/common/handle_watcher.h" |
15 #include "mojo/services/network/http_server_impl.h" | 15 #include "mojo/services/network/http_server_impl.h" |
16 #include "mojo/services/network/net_adapters.h" | 16 #include "mojo/services/network/net_adapters.h" |
| 17 #include "mojo/services/network/public/cpp/web_socket_read_queue.h" |
| 18 #include "mojo/services/network/public/cpp/web_socket_write_queue.h" |
| 19 #include "mojo/services/network/public/interfaces/web_socket.mojom.h" |
17 #include "net/base/net_errors.h" | 20 #include "net/base/net_errors.h" |
18 #include "net/http/http_request_headers.h" | 21 #include "net/http/http_request_headers.h" |
19 #include "net/http/http_status_code.h" | 22 #include "net/http/http_status_code.h" |
20 #include "net/server/http_server.h" | 23 #include "net/server/http_server.h" |
21 #include "net/server/http_server_request_info.h" | 24 #include "net/server/http_server_request_info.h" |
22 #include "net/server/http_server_response_info.h" | 25 #include "net/server/http_server_response_info.h" |
23 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h" | 26 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h" |
24 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" | 27 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h" |
25 | 28 |
26 namespace mojo { | 29 namespace mojo { |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
75 void OnHandleReady(MojoResult result) { ReadMore(); } | 78 void OnHandleReady(MojoResult result) { ReadMore(); } |
76 | 79 |
77 ScopedDataPipeConsumerHandle consumer_; | 80 ScopedDataPipeConsumerHandle consumer_; |
78 common::HandleWatcher watcher_; | 81 common::HandleWatcher watcher_; |
79 CompletionCallback completion_callback_; | 82 CompletionCallback completion_callback_; |
80 scoped_ptr<std::string> buffer_; | 83 scoped_ptr<std::string> buffer_; |
81 | 84 |
82 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); | 85 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); |
83 }; | 86 }; |
84 | 87 |
| 88 class HttpConnectionImpl::WebSocketImpl : public WebSocket, |
| 89 public ErrorHandler { |
| 90 public: |
| 91 // |connection| must outlive this object. |
| 92 WebSocketImpl(HttpConnectionImpl* connection, |
| 93 InterfaceRequest<WebSocket> request, |
| 94 ScopedDataPipeConsumerHandle send_stream, |
| 95 WebSocketClientPtr client) |
| 96 : connection_(connection), |
| 97 binding_(this, request.Pass()), |
| 98 client_(client.Pass()), |
| 99 send_stream_(send_stream.Pass()), |
| 100 read_send_stream_(new WebSocketReadQueue(send_stream_.get())), |
| 101 pending_send_count_(0) { |
| 102 DCHECK(binding_.is_bound()); |
| 103 DCHECK(client_); |
| 104 DCHECK(send_stream_.is_valid()); |
| 105 |
| 106 binding_.set_error_handler(this); |
| 107 client_.set_error_handler(this); |
| 108 |
| 109 DataPipe data_pipe; |
| 110 receive_stream_ = data_pipe.producer_handle.Pass(); |
| 111 write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get())); |
| 112 |
| 113 client_->DidConnect("", "", data_pipe.consumer_handle.Pass()); |
| 114 } |
| 115 |
| 116 ~WebSocketImpl() override {} |
| 117 |
| 118 void Close() { |
| 119 DCHECK(!IsClosing()); |
| 120 |
| 121 binding_.Close(); |
| 122 client_.reset(); |
| 123 |
| 124 NotifyOwnerCloseIfAllDone(); |
| 125 } |
| 126 |
| 127 void OnReceivedWebSocketMessage(const std::string& data) { |
| 128 if (IsClosing()) |
| 129 return; |
| 130 |
| 131 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, |
| 132 // WebSocket{Read,Write}Queue doesn't handle that correctly. |
| 133 if (data.empty()) |
| 134 return; |
| 135 |
| 136 uint32_t size = static_cast<uint32_t>(data.size()); |
| 137 write_receive_stream_->Write( |
| 138 &data[0], size, |
| 139 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream, |
| 140 base::Unretained(this), size)); |
| 141 } |
| 142 |
| 143 private: |
| 144 // WebSocket implementation. |
| 145 void Connect(const String& url, |
| 146 Array<String> protocols, |
| 147 const String& origin, |
| 148 ScopedDataPipeConsumerHandle send_stream, |
| 149 WebSocketClientPtr client) override { |
| 150 NOTREACHED(); |
| 151 } |
| 152 |
| 153 void Send(bool fin, MessageType type, uint32_t num_bytes) override { |
| 154 if (!fin || type != MESSAGE_TYPE_TEXT) { |
| 155 NOTIMPLEMENTED(); |
| 156 Close(); |
| 157 } |
| 158 |
| 159 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, |
| 160 // WebSocket{Read,Write}Queue doesn't handle that correctly. |
| 161 if (num_bytes == 0) |
| 162 return; |
| 163 |
| 164 pending_send_count_++; |
| 165 read_send_stream_->Read( |
| 166 num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream, |
| 167 base::Unretained(this), num_bytes)); |
| 168 } |
| 169 |
| 170 void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); } |
| 171 |
| 172 void Close(uint16_t code, const String& reason) override { |
| 173 Close(); |
| 174 } |
| 175 |
| 176 // ErrorHandler implementation. |
| 177 void OnConnectionError() override { Close(); } |
| 178 |
| 179 void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) { |
| 180 DCHECK_GT(pending_send_count_, 0u); |
| 181 pending_send_count_--; |
| 182 |
| 183 if (data) { |
| 184 connection_->server_->server()->SendOverWebSocket( |
| 185 connection_->connection_id_, std::string(data, num_bytes)); |
| 186 } |
| 187 |
| 188 if (IsClosing()) |
| 189 NotifyOwnerCloseIfAllDone(); |
| 190 } |
| 191 |
| 192 void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) { |
| 193 if (IsClosing()) |
| 194 return; |
| 195 |
| 196 if (buffer) |
| 197 client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes); |
| 198 } |
| 199 |
| 200 // Checks whether Close() has been called. |
| 201 bool IsClosing() const { return !binding_.is_bound(); } |
| 202 |
| 203 void NotifyOwnerCloseIfAllDone() { |
| 204 DCHECK(IsClosing()); |
| 205 |
| 206 if (pending_send_count_ == 0) |
| 207 connection_->OnWebSocketClosed(); |
| 208 } |
| 209 |
| 210 HttpConnectionImpl* const connection_; |
| 211 |
| 212 Binding<WebSocket> binding_; |
| 213 WebSocketClientPtr client_; |
| 214 |
| 215 ScopedDataPipeConsumerHandle send_stream_; |
| 216 scoped_ptr<WebSocketReadQueue> read_send_stream_; |
| 217 size_t pending_send_count_; |
| 218 |
| 219 ScopedDataPipeProducerHandle receive_stream_; |
| 220 scoped_ptr<WebSocketWriteQueue> write_receive_stream_; |
| 221 |
| 222 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); |
| 223 }; |
| 224 |
85 template <> | 225 template <> |
86 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { | 226 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> { |
87 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { | 227 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { |
88 HttpRequestPtr request(HttpRequest::New()); | 228 HttpRequestPtr request(HttpRequest::New()); |
89 request->method = obj.method; | 229 request->method = obj.method; |
90 request->url = obj.path; | 230 request->url = obj.path; |
91 request->headers.resize(obj.headers.size()); | 231 request->headers.resize(obj.headers.size()); |
92 size_t index = 0; | 232 size_t index = 0; |
93 for (const auto& item : obj.headers) { | 233 for (const auto& item : obj.headers) { |
94 HttpHeaderPtr header(HttpHeader::New()); | 234 HttpHeaderPtr header(HttpHeader::New()); |
(...skipping 13 matching lines...) Expand all Loading... |
108 MojoResult result = | 248 MojoResult result = |
109 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(), | 249 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(), |
110 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); | 250 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); |
111 CHECK_EQ(MOJO_RESULT_OK, result); | 251 CHECK_EQ(MOJO_RESULT_OK, result); |
112 } | 252 } |
113 return request.Pass(); | 253 return request.Pass(); |
114 } | 254 } |
115 }; | 255 }; |
116 | 256 |
117 HttpConnectionImpl::HttpConnectionImpl(int connection_id, | 257 HttpConnectionImpl::HttpConnectionImpl(int connection_id, |
118 HttpServerImpl* owner, | 258 HttpServerImpl* server, |
119 HttpConnectionDelegatePtr delegate, | 259 HttpConnectionDelegatePtr delegate, |
120 HttpConnectionPtr* connection) | 260 HttpConnectionPtr* connection) |
121 : connection_id_(connection_id), | 261 : connection_id_(connection_id), |
122 owner_(owner), | 262 server_(server), |
123 delegate_(delegate.Pass()), | 263 delegate_(delegate.Pass()), |
124 binding_(this, connection) { | 264 binding_(this, connection) { |
125 DCHECK(delegate_); | 265 DCHECK(delegate_); |
126 binding_.set_error_handler(this); | 266 binding_.set_error_handler(this); |
127 delegate_.set_error_handler(this); | 267 delegate_.set_error_handler(this); |
128 } | 268 } |
129 | 269 |
130 HttpConnectionImpl::~HttpConnectionImpl() { | 270 HttpConnectionImpl::~HttpConnectionImpl() { |
131 STLDeleteElements(&response_body_readers_); | 271 STLDeleteElements(&response_body_readers_); |
132 } | 272 } |
133 | 273 |
134 void HttpConnectionImpl::OnReceivedHttpRequest( | 274 void HttpConnectionImpl::OnReceivedHttpRequest( |
135 const net::HttpServerRequestInfo& info) { | 275 const net::HttpServerRequestInfo& info) { |
136 if (EncounteredConnectionError()) | 276 if (IsClosing()) |
137 return; | 277 return; |
138 | 278 |
139 delegate_->OnReceivedRequest( | 279 delegate_->OnReceivedRequest( |
140 HttpRequest::From(info), [this](HttpResponsePtr response) { | 280 HttpRequest::From(info), [this](HttpResponsePtr response) { |
141 if (response->body.is_valid()) { | 281 if (response->body.is_valid()) { |
142 SimpleDataPipeReader* reader = new SimpleDataPipeReader; | 282 SimpleDataPipeReader* reader = new SimpleDataPipeReader; |
143 response_body_readers_.insert(reader); | 283 response_body_readers_.insert(reader); |
144 ScopedDataPipeConsumerHandle body = response->body.Pass(); | 284 ScopedDataPipeConsumerHandle body = response->body.Pass(); |
145 reader->Start( | 285 reader->Start( |
146 body.Pass(), | 286 body.Pass(), |
147 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody, | 287 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody, |
148 base::Unretained(this), base::Passed(&response))); | 288 base::Unretained(this), base::Passed(&response))); |
149 } else { | 289 } else { |
150 OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr); | 290 OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr); |
151 } | 291 } |
152 }); | 292 }); |
153 } | 293 } |
154 | 294 |
155 void HttpConnectionImpl::OnReceivedWebSocketRequest( | 295 void HttpConnectionImpl::OnReceivedWebSocketRequest( |
156 const net::HttpServerRequestInfo& info) { | 296 const net::HttpServerRequestInfo& info) { |
157 // TODO(yzshen): implement it. | 297 if (IsClosing()) |
| 298 return; |
| 299 |
| 300 delegate_->OnReceivedWebSocketRequest( |
| 301 HttpRequest::From(info), |
| 302 [this, info](InterfaceRequest<WebSocket> web_socket, |
| 303 ScopedDataPipeConsumerHandle send_stream, |
| 304 WebSocketClientPtr web_socket_client) { |
| 305 if (!web_socket.is_pending() || !send_stream.is_valid() || |
| 306 !web_socket_client) { |
| 307 Close(); |
| 308 return; |
| 309 } |
| 310 |
| 311 web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(), |
| 312 send_stream.Pass(), |
| 313 web_socket_client.Pass())); |
| 314 server_->server()->AcceptWebSocket(connection_id_, info); |
| 315 }); |
158 } | 316 } |
159 | 317 |
160 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { | 318 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { |
161 // TODO(yzshen): implement it. | 319 if (IsClosing()) |
| 320 return; |
| 321 |
| 322 web_socket_->OnReceivedWebSocketMessage(data); |
162 } | 323 } |
163 | 324 |
164 void HttpConnectionImpl::SetSendBufferSize( | 325 void HttpConnectionImpl::SetSendBufferSize( |
165 uint32_t size, | 326 uint32_t size, |
166 const SetSendBufferSizeCallback& callback) { | 327 const SetSendBufferSizeCallback& callback) { |
167 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) | 328 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) |
168 size = std::numeric_limits<int32_t>::max(); | 329 size = std::numeric_limits<int32_t>::max(); |
169 | 330 |
170 owner_->server()->SetSendBufferSize( | 331 server_->server()->SetSendBufferSize(connection_id_, |
171 connection_id_, static_cast<int32_t>(size)); | 332 static_cast<int32_t>(size)); |
172 callback.Run(MakeNetworkError(net::OK)); | 333 callback.Run(MakeNetworkError(net::OK)); |
173 } | 334 } |
174 | 335 |
175 void HttpConnectionImpl::SetReceiveBufferSize( | 336 void HttpConnectionImpl::SetReceiveBufferSize( |
176 uint32_t size, | 337 uint32_t size, |
177 const SetReceiveBufferSizeCallback& callback) { | 338 const SetReceiveBufferSizeCallback& callback) { |
178 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) | 339 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max())) |
179 size = std::numeric_limits<int32_t>::max(); | 340 size = std::numeric_limits<int32_t>::max(); |
180 | 341 |
181 owner_->server()->SetReceiveBufferSize( | 342 server_->server()->SetReceiveBufferSize(connection_id_, |
182 connection_id_, static_cast<int32_t>(size)); | 343 static_cast<int32_t>(size)); |
183 callback.Run(MakeNetworkError(net::OK)); | 344 callback.Run(MakeNetworkError(net::OK)); |
184 } | 345 } |
185 | 346 |
186 void HttpConnectionImpl::OnConnectionError() { | 347 void HttpConnectionImpl::OnConnectionError() { |
187 // This method is called when the proxy side of |binding_| or the impl side of | 348 // This method is called when the proxy side of |binding_| or the impl side of |
188 // |delegate_| has closed the pipe. Although it is set as error handler for | 349 // |delegate_| has closed the pipe. Although it is set as error handler for |
189 // both |binding_| and |delegate_|, it will only be called at most once | 350 // both |binding_| and |delegate_|, it will only be called at most once |
190 // because when called it closes/resets |binding_| and |delegate_|. | 351 // because when called it closes/resets |binding_| and |delegate_|. |
191 DCHECK(!EncounteredConnectionError()); | 352 Close(); |
192 | |
193 binding_.Close(); | |
194 delegate_.reset(); | |
195 | |
196 // Don't close the connection until all pending responses are sent. | |
197 if (response_body_readers_.empty()) | |
198 owner_->server()->Close(connection_id_); | |
199 } | 353 } |
200 | 354 |
201 void HttpConnectionImpl::OnFinishedReadingResponseBody( | 355 void HttpConnectionImpl::OnFinishedReadingResponseBody( |
202 HttpResponsePtr response, | 356 HttpResponsePtr response, |
203 SimpleDataPipeReader* reader, | 357 SimpleDataPipeReader* reader, |
204 scoped_ptr<std::string> body) { | 358 scoped_ptr<std::string> body) { |
205 if (reader) { | 359 if (reader) { |
206 delete reader; | 360 delete reader; |
207 response_body_readers_.erase(reader); | 361 response_body_readers_.erase(reader); |
208 } | 362 } |
(...skipping 19 matching lines...) Expand all Loading... |
228 content_type = header.value; | 382 content_type = header.value; |
229 continue; | 383 continue; |
230 } | 384 } |
231 } | 385 } |
232 info.AddHeader(header.name, header.value); | 386 info.AddHeader(header.name, header.value); |
233 } | 387 } |
234 | 388 |
235 if (body) | 389 if (body) |
236 info.SetBody(*body, content_type); | 390 info.SetBody(*body, content_type); |
237 | 391 |
238 owner_->server()->SendResponse(connection_id_, info); | 392 server_->server()->SendResponse(connection_id_, info); |
239 | 393 |
240 if (response_body_readers_.empty() && EncounteredConnectionError()) | 394 if (IsClosing()) |
241 owner_->server()->Close(connection_id_); | 395 NotifyOwnerCloseIfAllDone(); |
| 396 } |
| 397 |
| 398 void HttpConnectionImpl::Close() { |
| 399 DCHECK(!IsClosing()); |
| 400 |
| 401 binding_.Close(); |
| 402 delegate_.reset(); |
| 403 |
| 404 if (web_socket_) |
| 405 web_socket_->Close(); |
| 406 |
| 407 NotifyOwnerCloseIfAllDone(); |
| 408 } |
| 409 |
| 410 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() { |
| 411 DCHECK(IsClosing()); |
| 412 |
| 413 // Don't close the connection until all pending sends are done. |
| 414 bool should_wait = !response_body_readers_.empty() || web_socket_; |
| 415 if (!should_wait) |
| 416 server_->server()->Close(connection_id_); |
| 417 } |
| 418 |
| 419 void HttpConnectionImpl::OnWebSocketClosed() { |
| 420 web_socket_.reset(); |
| 421 |
| 422 if (IsClosing()) { |
| 423 // The close operation is initiated by this object. |
| 424 NotifyOwnerCloseIfAllDone(); |
| 425 } else { |
| 426 // The close operation is initiated by |web_socket_|; start closing this |
| 427 // object. |
| 428 Close(); |
| 429 } |
242 } | 430 } |
243 | 431 |
244 } // namespace mojo | 432 } // namespace mojo |
OLD | NEW |