Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 7289006: Basic HTTP pipelining support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix races Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_pipelined_connection_impl.h"
6
7 #include "base/message_loop.h"
8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_stream.h"
11 #include "net/http/http_request_info.h"
12 #include "net/http/http_stream_parser.h"
13 #include "net/socket/client_socket_handle.h"
14
15 namespace net {
16
17 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
18 ClientSocketHandle* connection,
19 HttpPipelinedConnection::Delegate* delegate,
20 const SSLConfig& used_ssl_config,
21 const ProxyInfo& used_proxy_info,
22 const BoundNetLog& net_log,
23 bool was_npn_negotiated)
24 : delegate_(delegate),
25 connection_(connection),
26 used_ssl_config_(used_ssl_config),
27 used_proxy_info_(used_proxy_info),
28 net_log_(net_log),
29 was_npn_negotiated_(was_npn_negotiated),
30 read_buf_(new GrowableIOBuffer()),
31 next_pipeline_id_(1),
32 active_(false),
33 usable_(true),
34 completed_one_request_(false),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
43 read_user_callback_(NULL) {
44 CHECK(connection_.get());
45 }
46
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 CHECK_EQ(depth(), 0);
49 CHECK(stream_info_map_.empty());
50 CHECK(deferred_request_queue_.empty());
51 CHECK(request_order_.empty());
52 CHECK_EQ(send_next_state_, SEND_STATE_NONE);
53 CHECK_EQ(read_next_state_, READ_STATE_NONE);
54 CHECK(!send_user_callback_);
55 CHECK(!read_user_callback_);
56 if (!usable_) {
57 connection_->socket()->Disconnect();
58 }
59 connection_->Reset();
60 }
61
62 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
63 int pipeline_id = next_pipeline_id_++;
64 CHECK(pipeline_id);
65 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
66 stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo()));
67 return stream;
68 }
69
70 void HttpPipelinedConnectionImpl::InitializeParser(
71 int pipeline_id,
72 const HttpRequestInfo* request,
73 const BoundNetLog& net_log) {
74 CHECK(ContainsKey(stream_info_map_, pipeline_id));
75 CHECK(!stream_info_map_[pipeline_id].parser);
76 stream_info_map_[pipeline_id].state = STREAM_BOUND;
77 stream_info_map_[pipeline_id].parser = new HttpStreamParser(
78 connection_.get(), request, read_buf_.get(), net_log);
79
80 // In case our first stream doesn't SendRequest() immediately, we should still
81 // allow others to use this pipeline.
82 if (pipeline_id == 1) {
83 MessageLoop::current()->PostTask(
84 FROM_HERE,
85 method_factory_.NewRunnableMethod(
86 &HttpPipelinedConnectionImpl::ActivatePipeline));
87 }
88 }
89
90 void HttpPipelinedConnectionImpl::ActivatePipeline() {
91 if (!active_) {
92 active_ = true;
93 delegate_->OnPipelineHasCapacity(this);
94 }
95 }
96
97 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
98 CHECK(ContainsKey(stream_info_map_, pipeline_id));
99 Close(pipeline_id, false);
100
101 if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
102 stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
103 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
104 CHECK(stream_info_map_[pipeline_id].parser);
105 delete stream_info_map_[pipeline_id].parser;
106 stream_info_map_[pipeline_id].parser = NULL;
107 }
108 CHECK(!stream_info_map_[pipeline_id].parser);
109 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
110 stream_info_map_.erase(pipeline_id);
111
112 delegate_->OnPipelineHasCapacity(this);
113 }
114
115 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
116 const std::string& request_line,
117 const HttpRequestHeaders& headers,
118 UploadDataStream* request_body,
119 HttpResponseInfo* response,
120 OldCompletionCallback* callback) {
121 CHECK(ContainsKey(stream_info_map_, pipeline_id));
122 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
123 if (!usable_) {
124 return ERR_PIPELINE_EVICTION;
125 }
126
127 DeferredSendRequest deferred_request;
128 deferred_request.pipeline_id = pipeline_id;
129 deferred_request.request_line = request_line;
130 deferred_request.headers = headers;
131 deferred_request.request_body = request_body;
132 deferred_request.response = response;
133 deferred_request.callback = callback;
134 deferred_request_queue_.push(deferred_request);
135
136 int rv;
137 if (send_next_state_ == SEND_STATE_NONE) {
138 send_next_state_ = SEND_STATE_NEXT_REQUEST;
139 rv = DoSendRequestLoop(OK);
140 } else {
141 rv = ERR_IO_PENDING;
142 }
143 ActivatePipeline();
144 return rv;
145 }
146
147 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
148 int rv = result;
149 do {
150 SendRequestState state = send_next_state_;
151 send_next_state_ = SEND_STATE_NONE;
152 switch (state) {
153 case SEND_STATE_NEXT_REQUEST:
154 rv = DoSendNextRequest(rv);
155 break;
156 case SEND_STATE_COMPLETE:
157 rv = DoSendComplete(rv);
158 break;
159 case SEND_STATE_UNUSABLE:
160 rv = DoEvictPendingSendRequests(rv);
161 break;
162 default:
163 NOTREACHED() << "bad send state: " << state;
164 rv = ERR_FAILED;
165 break;
166 }
167 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
168 return rv;
169 }
170
171 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
172 CHECK(send_user_callback_);
173 DoSendRequestLoop(result);
174 }
175
176 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
177 CHECK(!deferred_request_queue_.empty());
178 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
179 CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id));
180 if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) {
181 deferred_request_queue_.pop();
182 if (deferred_request_queue_.empty()) {
183 send_next_state_ = SEND_STATE_NONE;
184 } else {
185 send_next_state_ = SEND_STATE_NEXT_REQUEST;
186 }
187 return OK;
188 }
189 CHECK(stream_info_map_[deferred_request.pipeline_id].parser);
190 int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest(
191 deferred_request.request_line,
192 deferred_request.headers,
193 deferred_request.request_body,
194 deferred_request.response,
195 &send_io_callback_);
196 // |result| == ERR_IO_PENDING means this function was *not* called on the same
197 // stack as SendRequest(). That means we returned ERR_IO_PENDING to
198 // SendRequest() earlier and will need to invoke its callback.
199 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
200 send_user_callback_ = deferred_request.callback;
201 }
202 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING;
203 send_next_state_ = SEND_STATE_COMPLETE;
204 return rv;
205 }
206
207 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
208 CHECK(!deferred_request_queue_.empty());
209 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
210 CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state,
211 STREAM_SENDING);
212 request_order_.push(deferred_request.pipeline_id);
213 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT;
214 deferred_request_queue_.pop();
215 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
216 result = ERR_PIPELINE_EVICTION;
217 }
218 if (result < OK) {
219 send_next_state_ = SEND_STATE_UNUSABLE;
220 usable_ = false;
221 }
222 if (send_user_callback_) {
223 MessageLoop::current()->PostTask(
224 FROM_HERE,
225 method_factory_.NewRunnableMethod(
226 &HttpPipelinedConnectionImpl::FireUserCallback,
227 send_user_callback_,
228 result));
229 send_user_callback_ = NULL;
230 }
231 if (result < OK) {
232 return result;
233 }
234 if (deferred_request_queue_.empty()) {
235 send_next_state_ = SEND_STATE_NONE;
236 return OK;
237 }
238 send_next_state_ = SEND_STATE_NEXT_REQUEST;
239 return OK;
240 }
241
242 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
243 send_next_state_ = SEND_STATE_NONE;
244 while (!deferred_request_queue_.empty()) {
245 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
246 if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
247 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
248 }
249 deferred_request_queue_.pop();
250 }
251 return result;
252 }
253
254 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
255 int pipeline_id,
256 OldCompletionCallback* callback) {
257 CHECK(ContainsKey(stream_info_map_, pipeline_id));
258 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT);
259 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
260 if (!usable_) {
261 return ERR_PIPELINE_EVICTION;
262 }
263 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
264 stream_info_map_[pipeline_id].read_headers_callback = callback;
265 if (read_next_state_ == READ_STATE_NONE) {
266 read_next_state_ = READ_STATE_NEXT_HEADERS;
267 return DoReadHeadersLoop(OK);
268 } else {
269 return ERR_IO_PENDING;
270 }
271 }
272
273 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
274 int rv = result;
275 do {
276 ReadHeadersState state = read_next_state_;
277 read_next_state_ = READ_STATE_NONE;
278 switch (state) {
279 case READ_STATE_NEXT_HEADERS:
280 rv = DoReadNextHeaders(rv);
281 break;
282 case READ_STATE_COMPLETE:
283 rv = DoReadHeadersComplete(rv);
284 break;
285 case READ_STATE_WAITING_FOR_CLOSE:
286 rv = DoReadWaitingForClose(rv);
287 return rv;
288 case READ_STATE_STREAM_CLOSED:
289 rv = DoReadStreamClosed();
290 break;
291 case READ_STATE_UNUSABLE:
292 rv = DoEvictPendingReadHeaders(rv);
293 break;
294 case READ_STATE_NONE:
295 break;
296 default:
297 NOTREACHED() << "bad read state";
298 rv = ERR_FAILED;
299 break;
300 }
301 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
302 return rv;
303 }
304
305 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
306 CHECK(read_user_callback_);
307 DoReadHeadersLoop(result);
308 }
309
310 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
311 CHECK(!request_order_.empty());
312 int pipeline_id = request_order_.front();
313 CHECK(ContainsKey(stream_info_map_, pipeline_id));
314 if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) {
315 // Since nobody will read whatever data is on the pipeline associated with
316 // this closed request, we must shut down the rest of the pipeline.
317 read_next_state_ = READ_STATE_UNUSABLE;
318 return OK;
319 }
320 if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
321 return ERR_IO_PENDING;
322 }
323 CHECK(stream_info_map_[pipeline_id].parser);
324
325 if (result == ERR_IO_PENDING) {
326 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE);
327 } else {
328 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING);
329 stream_info_map_[pipeline_id].state = STREAM_ACTIVE;
330 }
331
332 int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders(
333 &read_io_callback_);
334 if (rv == ERR_IO_PENDING) {
335 read_next_state_ = READ_STATE_COMPLETE;
336 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
337 } else if (rv < OK) {
338 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
339 if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
340 rv = ERR_PIPELINE_EVICTION;
341 } else {
342 CHECK_LE(OK, rv);
343 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
344 }
345
346 // |result| == ERR_IO_PENDING means this function was *not* called on the same
347 // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
348 // ReadResponseHeaders() earlier and now need to invoke its callback.
349 if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
350 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
351 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
352 MessageLoop::current()->PostTask(
353 FROM_HERE,
354 method_factory_.NewRunnableMethod(
355 &HttpPipelinedConnectionImpl::FireUserCallback,
356 read_user_callback_,
357 rv));
358 }
359 return rv;
360 }
361
362 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
363 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
364 if (read_user_callback_) {
365 MessageLoop::current()->PostTask(
366 FROM_HERE,
367 method_factory_.NewRunnableMethod(
368 &HttpPipelinedConnectionImpl::FireUserCallback,
369 read_user_callback_,
370 result));
371 read_user_callback_ = NULL;
372 }
373 return result;
374 }
375
376 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
377 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
378 return result;
379 }
380
381 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
382 CHECK(!request_order_.empty());
383 int pipeline_id = request_order_.front();
384 CHECK(ContainsKey(stream_info_map_, pipeline_id));
385 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
386 CHECK(stream_info_map_[pipeline_id].read_headers_callback);
387 stream_info_map_[pipeline_id].read_headers_callback = NULL;
388 request_order_.pop();
389 if (!usable_) {
390 read_next_state_ = READ_STATE_UNUSABLE;
391 return OK;
392 } else {
393 completed_one_request_ = true;
394 if (!request_order_.empty()) {
395 int next_pipeline_id = request_order_.front();
396 CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
397 if (stream_info_map_[next_pipeline_id].read_headers_callback) {
398 stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
399 read_next_state_ = READ_STATE_NEXT_HEADERS;
400 MessageLoop::current()->PostTask(
401 FROM_HERE,
402 method_factory_.NewRunnableMethod(
403 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
404 ERR_IO_PENDING));
405 return ERR_IO_PENDING; // Wait for the task to fire.
406 }
407 }
408 read_next_state_ = READ_STATE_NONE;
409 return OK;
410 }
411 }
412
413 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
414 while (!request_order_.empty()) {
415 int evicted_id = request_order_.front();
416 request_order_.pop();
417 if (stream_info_map_[evicted_id].read_headers_callback == NULL) {
418 continue;
419 }
420 if (stream_info_map_[evicted_id].state != STREAM_CLOSED) {
421 stream_info_map_[evicted_id].read_headers_callback->Run(
422 ERR_PIPELINE_EVICTION);
423 }
424 stream_info_map_[evicted_id].read_headers_callback = NULL;
425 }
426 read_next_state_ = READ_STATE_NONE;
427 return result;
428 }
429
430 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
431 bool not_reusable) {
432 CHECK(ContainsKey(stream_info_map_, pipeline_id));
433 switch (stream_info_map_[pipeline_id].state) {
434 case STREAM_CREATED:
435 stream_info_map_[pipeline_id].state = STREAM_UNUSED;
436 break;
437
438 case STREAM_BOUND:
439 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
440 break;
441
442 case STREAM_SENDING:
443 usable_ = false;
444 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
445 send_user_callback_ = NULL;
446 send_next_state_ = SEND_STATE_UNUSABLE;
447 DoSendRequestLoop(OK);
448 break;
449
450 case STREAM_SENT:
451 case STREAM_READ_PENDING:
452 usable_ = false;
453 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
454 if (read_next_state_ == READ_STATE_NONE) {
455 read_next_state_ = READ_STATE_UNUSABLE;
456 DoReadHeadersLoop(OK);
457 }
458 break;
459
460 case STREAM_ACTIVE:
461 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
462 if (not_reusable) {
463 usable_ = false;
464 }
465 read_next_state_ = READ_STATE_STREAM_CLOSED;
466 read_user_callback_ = NULL;
467 DoReadHeadersLoop(OK);
468 break;
469
470 case STREAM_CLOSED:
471 case STREAM_UNUSED:
472 // TODO(simonjam): Why is Close() sometimes called twice?
473 break;
474
475 default:
476 NOTREACHED();
477 break;
478 }
479 }
480
481 int HttpPipelinedConnectionImpl::ReadResponseBody(
482 int pipeline_id,
483 IOBuffer* buf,
484 int buf_len,
485 OldCompletionCallback* callback) {
486 CHECK(ContainsKey(stream_info_map_, pipeline_id));
487 CHECK(!request_order_.empty());
488 CHECK_EQ(pipeline_id, request_order_.front());
489 CHECK(stream_info_map_[pipeline_id].parser);
490 return stream_info_map_[pipeline_id].parser->ReadResponseBody(
491 buf, buf_len, callback);
492 }
493
494 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
495 CHECK(ContainsKey(stream_info_map_, pipeline_id));
496 CHECK(stream_info_map_.find(pipeline_id)->second.parser);
497 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
498 }
499
500 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
501 int pipeline_id) {
502 CHECK(ContainsKey(stream_info_map_, pipeline_id));
503 CHECK(stream_info_map_.find(pipeline_id)->second.parser);
504 return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo();
505 }
506
507 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
508 int pipeline_id) const {
509 CHECK(ContainsKey(stream_info_map_, pipeline_id));
510 CHECK(stream_info_map_.find(pipeline_id)->second.parser);
511 return stream_info_map_.find(pipeline_id)->second.parser->
512 IsResponseBodyComplete();
513 }
514
515 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
516 CHECK(ContainsKey(stream_info_map_, pipeline_id));
517 CHECK(stream_info_map_.find(pipeline_id)->second.parser);
518 return stream_info_map_.find(pipeline_id)->second.parser->
519 CanFindEndOfResponse();
520 }
521
522 bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const {
523 CHECK(ContainsKey(stream_info_map_, pipeline_id));
524 return read_buf_->offset();
525 }
526
527 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
528 CHECK(ContainsKey(stream_info_map_, pipeline_id));
529 if (pipeline_id > 1) {
530 return true;
531 }
532 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
533 return connection_->is_reused() ||
534 reuse_type == ClientSocketHandle::UNUSED_IDLE;
535 }
536
537 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
538 CHECK(ContainsKey(stream_info_map_, pipeline_id));
539 connection_->set_is_reused(true);
540 }
541
542 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
543 SSLInfo* ssl_info) {
544 CHECK(ContainsKey(stream_info_map_, pipeline_id));
545 CHECK(stream_info_map_[pipeline_id].parser);
546 return stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info);
547 }
548
549 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
550 int pipeline_id,
551 SSLCertRequestInfo* cert_request_info) {
552 CHECK(ContainsKey(stream_info_map_, pipeline_id));
553 CHECK(stream_info_map_[pipeline_id].parser);
554 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
555 cert_request_info);
556 }
557
558 void HttpPipelinedConnectionImpl::FireUserCallback(
559 OldCompletionCallback* callback,
560 int result) {
561 CHECK(callback);
562 callback->Run(result);
563 }
564
565 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698