Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(302)

Side by Side Diff: mojo/services/network/http_connection_impl.cc

Issue 1873463003: Remove mojo network service. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/services/network/http_connection_impl.h ('k') | mojo/services/network/http_server_impl.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/network/http_connection_impl.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <limits>
11 #include <utility>
12
13 #include "base/bind.h"
14 #include "base/bind_helpers.h"
15 #include "base/callback.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/stl_util.h"
19 #include "base/strings/string_util.h"
20 #include "mojo/message_pump/handle_watcher.h"
21 #include "mojo/public/cpp/bindings/type_converter.h"
22 #include "mojo/public/cpp/system/data_pipe.h"
23 #include "mojo/services/network/http_server_impl.h"
24 #include "mojo/services/network/net_adapters.h"
25 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
26 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
27 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
28 #include "net/base/net_errors.h"
29 #include "net/http/http_request_headers.h"
30 #include "net/http/http_status_code.h"
31 #include "net/server/http_server.h"
32 #include "net/server/http_server_request_info.h"
33 #include "net/server/http_server_response_info.h"
34
35 namespace mojo {
36
37 // SimpleDataPipeReader reads till end-of-file, stores the data in a string and
38 // notifies completion.
39 class HttpConnectionImpl::SimpleDataPipeReader {
40 public:
41 using CompletionCallback =
42 base::Callback<void(SimpleDataPipeReader*, scoped_ptr<std::string>)>;
43
44 SimpleDataPipeReader() {}
45 ~SimpleDataPipeReader() {}
46
47 void Start(ScopedDataPipeConsumerHandle consumer,
48 const CompletionCallback& completion_callback) {
49 DCHECK(consumer.is_valid() && !consumer_.is_valid());
50 consumer_ = std::move(consumer);
51 completion_callback_ = completion_callback;
52 buffer_.reset(new std::string);
53 ReadMore();
54 }
55
56 private:
57 void ReadMore() {
58 const void* buf;
59 uint32_t buf_size;
60 MojoResult rv = BeginReadDataRaw(consumer_.get(), &buf, &buf_size,
61 MOJO_READ_DATA_FLAG_NONE);
62 if (rv == MOJO_RESULT_OK) {
63 buffer_->append(static_cast<const char*>(buf), buf_size);
64 EndReadDataRaw(consumer_.get(), buf_size);
65 WaitToReadMore();
66 } else if (rv == MOJO_RESULT_SHOULD_WAIT) {
67 WaitToReadMore();
68 } else if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
69 // We reached end-of-file.
70 completion_callback_.Run(this, std::move(buffer_));
71 // Note: This object may have been destroyed in the callback.
72 } else {
73 CHECK(false);
74 }
75 }
76
77 void WaitToReadMore() {
78 watcher_.Start(consumer_.get(), MOJO_HANDLE_SIGNAL_READABLE,
79 MOJO_DEADLINE_INDEFINITE,
80 base::Bind(&SimpleDataPipeReader::OnHandleReady,
81 base::Unretained(this)));
82 }
83
84 void OnHandleReady(MojoResult result) { ReadMore(); }
85
86 ScopedDataPipeConsumerHandle consumer_;
87 common::HandleWatcher watcher_;
88 CompletionCallback completion_callback_;
89 scoped_ptr<std::string> buffer_;
90
91 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
92 };
93
94 class HttpConnectionImpl::WebSocketImpl : public WebSocket {
95 public:
96 // |connection| must outlive this object.
97 WebSocketImpl(HttpConnectionImpl* connection,
98 InterfaceRequest<WebSocket> request,
99 ScopedDataPipeConsumerHandle send_stream,
100 WebSocketClientPtr client)
101 : connection_(connection),
102 binding_(this, std::move(request)),
103 client_(std::move(client)),
104 send_stream_(std::move(send_stream)),
105 read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
106 pending_send_count_(0) {
107 DCHECK(binding_.is_bound());
108 DCHECK(client_);
109 DCHECK(send_stream_.is_valid());
110
111 binding_.set_connection_error_handler([this]() { Close(); });
112 client_.set_connection_error_handler([this]() { Close(); });
113
114 DataPipe data_pipe;
115 receive_stream_ = std::move(data_pipe.producer_handle);
116 write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
117
118 client_->DidConnect("", "", std::move(data_pipe.consumer_handle));
119 }
120
121 ~WebSocketImpl() override {}
122
123 void Close() {
124 DCHECK(!IsClosing());
125
126 binding_.Close();
127 client_.reset();
128
129 NotifyOwnerCloseIfAllDone();
130 }
131
132 void OnReceivedWebSocketMessage(const std::string& data) {
133 if (IsClosing())
134 return;
135
136 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
137 // WebSocket{Read,Write}Queue doesn't handle that correctly.
138 if (data.empty())
139 return;
140
141 uint32_t size = static_cast<uint32_t>(data.size());
142 write_receive_stream_->Write(
143 &data[0], size,
144 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
145 base::Unretained(this), size));
146 }
147
148 private:
149 // WebSocket implementation.
150 void Connect(const String& url,
151 Array<String> protocols,
152 const String& origin,
153 ScopedDataPipeConsumerHandle send_stream,
154 WebSocketClientPtr client) override {
155 NOTREACHED();
156 }
157
158 void Send(bool fin, MessageType type, uint32_t num_bytes) override {
159 if (!fin || type != MessageType::TEXT) {
160 NOTIMPLEMENTED();
161 Close();
162 }
163
164 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
165 // WebSocket{Read,Write}Queue doesn't handle that correctly.
166 if (num_bytes == 0)
167 return;
168
169 pending_send_count_++;
170 read_send_stream_->Read(
171 num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
172 base::Unretained(this), num_bytes));
173 }
174
175 void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
176
177 void Close(uint16_t code, const String& reason) override {
178 Close();
179 }
180
181 void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
182 DCHECK_GT(pending_send_count_, 0u);
183 pending_send_count_--;
184
185 if (data) {
186 connection_->server_->server()->SendOverWebSocket(
187 connection_->connection_id_, std::string(data, num_bytes));
188 }
189
190 if (IsClosing())
191 NotifyOwnerCloseIfAllDone();
192 }
193
194 void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
195 if (IsClosing())
196 return;
197
198 if (buffer)
199 client_->DidReceiveData(true, MessageType::TEXT, num_bytes);
200 }
201
202 // Checks whether Close() has been called.
203 bool IsClosing() const { return !binding_.is_bound(); }
204
205 void NotifyOwnerCloseIfAllDone() {
206 DCHECK(IsClosing());
207
208 if (pending_send_count_ == 0)
209 connection_->OnWebSocketClosed();
210 }
211
212 HttpConnectionImpl* const connection_;
213
214 Binding<WebSocket> binding_;
215 WebSocketClientPtr client_;
216
217 ScopedDataPipeConsumerHandle send_stream_;
218 scoped_ptr<WebSocketReadQueue> read_send_stream_;
219 size_t pending_send_count_;
220
221 ScopedDataPipeProducerHandle receive_stream_;
222 scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
223
224 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
225 };
226
227 template <>
228 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
229 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
230 HttpRequestPtr request(HttpRequest::New());
231 request->method = obj.method;
232 request->url = obj.path;
233 request->headers.resize(obj.headers.size());
234 size_t index = 0;
235 for (const auto& item : obj.headers) {
236 HttpHeaderPtr header(HttpHeader::New());
237 header->name = item.first;
238 header->value = item.second;
239 request->headers[index++] = std::move(header);
240 }
241 if (!obj.data.empty()) {
242 uint32_t num_bytes = static_cast<uint32_t>(obj.data.size());
243 MojoCreateDataPipeOptions options;
244 options.struct_size = sizeof(MojoCreateDataPipeOptions);
245 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
246 options.element_num_bytes = 1;
247 options.capacity_num_bytes = num_bytes;
248 DataPipe data_pipe(options);
249 request->body = std::move(data_pipe.consumer_handle);
250 MojoResult result =
251 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(),
252 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
253 CHECK_EQ(MOJO_RESULT_OK, result);
254 }
255 return request;
256 }
257 };
258
259 HttpConnectionImpl::HttpConnectionImpl(int connection_id,
260 HttpServerImpl* server,
261 HttpConnectionDelegatePtr delegate,
262 HttpConnectionPtr* connection)
263 : connection_id_(connection_id),
264 server_(server),
265 delegate_(std::move(delegate)),
266 binding_(this, connection) {
267 DCHECK(delegate_);
268 binding_.set_connection_error_handler([this]() { Close(); });
269 delegate_.set_connection_error_handler([this]() { Close(); });
270 }
271
272 HttpConnectionImpl::~HttpConnectionImpl() {
273 STLDeleteElements(&response_body_readers_);
274 }
275
276 void HttpConnectionImpl::OnReceivedHttpRequest(
277 const net::HttpServerRequestInfo& info) {
278 if (IsClosing())
279 return;
280
281 delegate_->OnReceivedRequest(
282 HttpRequest::From(info), [this](HttpResponsePtr response) {
283 if (response->body.is_valid()) {
284 SimpleDataPipeReader* reader = new SimpleDataPipeReader;
285 response_body_readers_.insert(reader);
286 ScopedDataPipeConsumerHandle body = std::move(response->body);
287 reader->Start(
288 std::move(body),
289 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody,
290 base::Unretained(this), base::Passed(&response)));
291 } else {
292 OnFinishedReadingResponseBody(std::move(response), nullptr, nullptr);
293 }
294 });
295 }
296
297 void HttpConnectionImpl::OnReceivedWebSocketRequest(
298 const net::HttpServerRequestInfo& info) {
299 if (IsClosing())
300 return;
301
302 delegate_->OnReceivedWebSocketRequest(
303 HttpRequest::From(info),
304 [this, info](InterfaceRequest<WebSocket> web_socket,
305 ScopedDataPipeConsumerHandle send_stream,
306 WebSocketClientPtr web_socket_client) {
307 if (!web_socket.is_pending() || !send_stream.is_valid() ||
308 !web_socket_client) {
309 Close();
310 return;
311 }
312
313 web_socket_.reset(new WebSocketImpl(this, std::move(web_socket),
314 std::move(send_stream),
315 std::move(web_socket_client)));
316 server_->server()->AcceptWebSocket(connection_id_, info);
317 });
318 }
319
320 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
321 if (IsClosing())
322 return;
323
324 web_socket_->OnReceivedWebSocketMessage(data);
325 }
326
327 void HttpConnectionImpl::SetSendBufferSize(
328 uint32_t size,
329 const SetSendBufferSizeCallback& callback) {
330 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
331 size = std::numeric_limits<int32_t>::max();
332
333 server_->server()->SetSendBufferSize(connection_id_,
334 static_cast<int32_t>(size));
335 callback.Run(MakeNetworkError(net::OK));
336 }
337
338 void HttpConnectionImpl::SetReceiveBufferSize(
339 uint32_t size,
340 const SetReceiveBufferSizeCallback& callback) {
341 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
342 size = std::numeric_limits<int32_t>::max();
343
344 server_->server()->SetReceiveBufferSize(connection_id_,
345 static_cast<int32_t>(size));
346 callback.Run(MakeNetworkError(net::OK));
347 }
348
349 void HttpConnectionImpl::OnFinishedReadingResponseBody(
350 HttpResponsePtr response,
351 SimpleDataPipeReader* reader,
352 scoped_ptr<std::string> body) {
353 if (reader) {
354 delete reader;
355 response_body_readers_.erase(reader);
356 }
357
358 net::HttpServerResponseInfo info(
359 static_cast<net::HttpStatusCode>(response->status_code));
360
361 std::string content_type;
362 for (size_t i = 0; i < response->headers.size(); ++i) {
363 const HttpHeader& header = *(response->headers[i]);
364
365 if (body) {
366 // net::HttpServerResponseInfo::SetBody() automatically sets
367 // Content-Length and Content-Types, so skip the two here.
368 //
369 // TODO(yzshen): Consider adding to net::HttpServerResponseInfo a simple
370 // setter for body which doesn't fiddle with headers.
371 base::StringPiece name_piece(header.name.data(), header.name.size());
372 if (base::EqualsCaseInsensitiveASCII(
373 name_piece, net::HttpRequestHeaders::kContentLength)) {
374 continue;
375 } else if (base::EqualsCaseInsensitiveASCII(
376 name_piece, net::HttpRequestHeaders::kContentType)) {
377 content_type = header.value;
378 continue;
379 }
380 }
381 info.AddHeader(header.name, header.value);
382 }
383
384 if (body)
385 info.SetBody(*body, content_type);
386
387 server_->server()->SendResponse(connection_id_, info);
388
389 if (IsClosing())
390 NotifyOwnerCloseIfAllDone();
391 }
392
393 void HttpConnectionImpl::Close() {
394 DCHECK(!IsClosing());
395
396 binding_.Close();
397 delegate_.reset();
398
399 if (web_socket_)
400 web_socket_->Close();
401
402 NotifyOwnerCloseIfAllDone();
403 }
404
405 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
406 DCHECK(IsClosing());
407
408 // Don't close the connection until all pending sends are done.
409 bool should_wait = !response_body_readers_.empty() || web_socket_;
410 if (!should_wait)
411 server_->server()->Close(connection_id_);
412 }
413
414 void HttpConnectionImpl::OnWebSocketClosed() {
415 web_socket_.reset();
416
417 if (IsClosing()) {
418 // The close operation is initiated by this object.
419 NotifyOwnerCloseIfAllDone();
420 } else {
421 // The close operation is initiated by |web_socket_|; start closing this
422 // object.
423 Close();
424 }
425 }
426
427 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/services/network/http_connection_impl.h ('k') | mojo/services/network/http_server_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698