Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 7289006: Basic HTTP pipelining support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fixed transaction. Created 9 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_pipelined_connection_impl.h"
6
7 #include "base/message_loop.h"
8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_stream.h"
11 #include "net/http/http_request_info.h"
12 #include "net/http/http_stream_parser.h"
13 #include "net/socket/client_socket_handle.h"
14
15 namespace net {
16
17 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
18 ClientSocketHandle* connection,
19 HttpPipelinedConnection::Delegate* delegate,
20 const SSLConfig& used_ssl_config,
21 const ProxyInfo& used_proxy_info,
22 const BoundNetLog& net_log,
23 bool was_npn_negotiated)
24 : delegate_(delegate),
25 connection_(connection),
26 used_ssl_config_(used_ssl_config),
27 used_proxy_info_(used_proxy_info),
28 net_log_(net_log),
29 was_npn_negotiated_(was_npn_negotiated),
30 read_buf_(new GrowableIOBuffer()),
31 next_pipeline_id_(1),
32 active_(false),
33 usable_(true),
34 completed_one_request_(false),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
43 read_user_callback_(NULL) {
44 DCHECK(connection_.get());
45 }
46
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 DCHECK(depth() == 0);
49 DCHECK(parser_map_.empty());
50 DCHECK(callback_map_.empty());
51 DCHECK(deferred_request_queue_.empty());
52 DCHECK(request_order_.empty());
53 DCHECK_EQ(send_next_state_, SEND_STATE_NONE);
54 DCHECK_EQ(read_next_state_, READ_STATE_NONE);
55 DCHECK(!send_user_callback_);
56 DCHECK(!read_user_callback_);
57 if (!usable_) {
58 connection_->socket()->Disconnect();
59 }
60 connection_->Reset();
61 }
62
63 HttpStream* HttpPipelinedConnectionImpl::CreateNewStream() {
64 int pipeline_id = next_pipeline_id_++;
65 DCHECK(pipeline_id);
66 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
67 stream_state_map_.insert(std::make_pair(pipeline_id, STREAM_CREATED));
68 return stream;
69 }
70
71 void HttpPipelinedConnectionImpl::InitializeParser(
72 int pipeline_id,
73 const HttpRequestInfo* request,
74 const BoundNetLog& net_log) {
75 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
76 DCHECK(!ContainsKey(parser_map_, pipeline_id));
77 stream_state_map_[pipeline_id] = STREAM_BOUND;
78 HttpStreamParser* parser = new HttpStreamParser(connection_.get(),
79 request,
80 read_buf_.get(),
81 net_log);
82 parser_map_.insert(std::make_pair(pipeline_id, parser));
83 if (!active_) {
84 active_ = true;
85 MessageLoop::current()->PostTask(
86 FROM_HERE,
87 method_factory_.NewRunnableMethod(
88 &HttpPipelinedConnectionImpl::FillPipeline));
89 }
90 }
91
92 void HttpPipelinedConnectionImpl::FillPipeline() {
93 delegate_->OnPipelineHasCapacity(this);
94 }
95
96 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
97 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
98 Close(pipeline_id, false);
99 DCHECK(!ContainsKey(callback_map_, pipeline_id));
100
101 if (stream_state_map_[pipeline_id] != STREAM_CREATED &&
102 stream_state_map_[pipeline_id] != STREAM_UNUSED) {
103 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_CLOSED);
104 DCHECK(ContainsKey(parser_map_, pipeline_id));
105
106 ParserMap::iterator it = parser_map_.find(pipeline_id);
107 delete it->second;
108 parser_map_.erase(it);
109 }
110 DCHECK(!ContainsKey(parser_map_, pipeline_id));
111 stream_state_map_.erase(pipeline_id);
112
113 MessageLoop::current()->PostTask(
114 FROM_HERE,
115 method_factory_.NewRunnableMethod(
116 &HttpPipelinedConnectionImpl::FillPipeline));
117 }
118
119 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
120 const std::string& request_line,
121 const HttpRequestHeaders& headers,
122 UploadDataStream* request_body,
123 HttpResponseInfo* response,
124 CompletionCallback* callback) {
125 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
126 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_BOUND);
127 if (!usable_) {
128 return ERR_PIPELINE_EVICTION;
129 }
130
131 DeferredSendRequest deferred_request;
132 deferred_request.pipeline_id = pipeline_id;
133 deferred_request.request_line = request_line;
134 deferred_request.headers = headers;
135 deferred_request.request_body = request_body;
136 deferred_request.response = response;
137 deferred_request.callback = callback;
138 deferred_request_queue_.push(deferred_request);
139
140 if (send_next_state_ == SEND_STATE_NONE) {
141 send_next_state_ = SEND_STATE_NEXT_REQUEST;
142 return DoSendRequestLoop(OK);
143 } else {
144 return ERR_IO_PENDING;
145 }
146 }
147
148 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
149 int rv = result;
150 do {
151 SendRequestState state = send_next_state_;
152 send_next_state_ = SEND_STATE_NONE;
153 switch (state) {
154 case SEND_STATE_NEXT_REQUEST:
155 rv = DoSendNextRequest(rv);
156 break;
157 case SEND_STATE_COMPLETE:
158 rv = DoSendComplete(rv);
159 break;
160 case SEND_STATE_UNUSABLE:
161 rv = DoEvictPendingSendRequests(rv);
162 break;
163 default:
164 NOTREACHED() << "bad send state: " << state;
165 rv = ERR_FAILED;
166 break;
167 }
168 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
169 return rv;
170 }
171
172 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
173 DCHECK(send_user_callback_);
174 DoSendRequestLoop(result);
175 }
176
177 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
178 DCHECK(!deferred_request_queue_.empty());
179 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
180 DCHECK(ContainsKey(stream_state_map_, deferred_request.pipeline_id));
181 if (stream_state_map_[deferred_request.pipeline_id] == STREAM_CLOSED) {
182 deferred_request_queue_.pop();
183 if (deferred_request_queue_.empty()) {
184 send_next_state_ = SEND_STATE_NONE;
185 } else {
186 send_next_state_ = SEND_STATE_NEXT_REQUEST;
187 }
188 return OK;
189 }
190 DCHECK(ContainsKey(parser_map_, deferred_request.pipeline_id));
191 int rv = parser_map_[deferred_request.pipeline_id]->SendRequest(
192 deferred_request.request_line,
193 deferred_request.headers,
194 deferred_request.request_body,
195 deferred_request.response,
196 &send_io_callback_);
197 // |result| == ERR_IO_PENDING means this function was *not* called on the same
198 // stack as SendRequest(). That means we returned ERR_IO_PENDING to
199 // SendRequest() earlier and will need to invoke its callback.
200 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
201 send_user_callback_ = deferred_request.callback;
202 }
203 stream_state_map_[deferred_request.pipeline_id] = STREAM_SENDING;
204 send_next_state_ = SEND_STATE_COMPLETE;
205 return rv;
206 }
207
208 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
209 DCHECK(!deferred_request_queue_.empty());
210 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
211 DCHECK_EQ(stream_state_map_[deferred_request.pipeline_id], STREAM_SENDING);
212 request_order_.push(deferred_request.pipeline_id);
213 stream_state_map_[deferred_request.pipeline_id] = STREAM_SENT;
214 deferred_request_queue_.pop();
215 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
216 result = ERR_PIPELINE_EVICTION;
217 }
218 if (send_user_callback_) {
219 CompletionCallback* callback = send_user_callback_;
220 send_user_callback_ = NULL;
221 callback->Run(result);
mmenke 2011/09/15 19:28:16 Maybe we should mark this as unusable before runni
James Simonsen 2011/09/17 01:23:02 Yeah, that sounds good.
mmenke 2011/09/17 01:41:22 Yea, I was just thinking that we don't test reassi
222 }
223 if (result < OK) {
224 send_next_state_ = SEND_STATE_UNUSABLE;
225 usable_ = false;
226 return result;
227 }
228 if (deferred_request_queue_.empty()) {
229 send_next_state_ = SEND_STATE_NONE;
230 return OK;
231 }
232 send_next_state_ = SEND_STATE_NEXT_REQUEST;
233 MessageLoop::current()->PostTask(
234 FROM_HERE,
235 method_factory_.NewRunnableMethod(
236 &HttpPipelinedConnectionImpl::DoSendRequestLoop,
237 ERR_IO_PENDING));
238 return ERR_IO_PENDING; // Wait for the task to fire.
239 }
240
241 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
242 send_next_state_ = SEND_STATE_NONE;
243 while (!deferred_request_queue_.empty()) {
244 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
245 if (stream_state_map_[evicted_send.pipeline_id] != STREAM_CLOSED) {
246 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
247 }
248 deferred_request_queue_.pop();
249 }
250 return result;
251 }
252
253 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
254 int pipeline_id,
255 CompletionCallback* callback) {
256 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
257 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_SENT);
258 DCHECK(!ContainsKey(callback_map_, pipeline_id));
259 if (!usable_) {
260 return ERR_PIPELINE_EVICTION;
261 }
262 stream_state_map_[pipeline_id] = STREAM_READ_PENDING;
263 callback_map_[pipeline_id] = callback;
264 if (read_next_state_ == READ_STATE_NONE) {
265 read_next_state_ = READ_STATE_NEXT_HEADERS;
266 return DoReadHeadersLoop(OK);
267 } else {
268 return ERR_IO_PENDING;
269 }
270 }
271
272 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
273 int rv = result;
274 do {
275 ReadHeadersState state = read_next_state_;
276 read_next_state_ = READ_STATE_NONE;
277 switch (state) {
278 case READ_STATE_NEXT_HEADERS:
279 rv = DoReadNextHeaders(rv);
280 break;
281 case READ_STATE_COMPLETE:
282 rv = DoReadHeadersComplete(rv);
283 break;
284 case READ_STATE_WAITING_FOR_CLOSE:
285 rv = DoReadWaitingForClose(rv);
286 return rv;
287 case READ_STATE_STREAM_CLOSED:
288 rv = DoReadStreamClosed();
289 break;
290 case READ_STATE_UNUSABLE:
291 rv = DoEvictPendingReadHeaders(rv);
292 break;
293 case READ_STATE_NONE:
294 break;
295 default:
296 NOTREACHED() << "bad read state";
297 rv = ERR_FAILED;
298 break;
299 }
300 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
301 return rv;
302 }
303
304 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
305 DCHECK(read_user_callback_ != NULL);
306 DoReadHeadersLoop(result);
307 }
308
309 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
310 DCHECK(!request_order_.empty());
311 int pipeline_id = request_order_.front();
312 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
313 if (stream_state_map_[pipeline_id] == STREAM_CLOSED) {
314 // Since nobody will read whatever data is on the pipeline associated with
315 // this closed request, we must shut down the rest of the pipeline.
316 read_next_state_ = READ_STATE_UNUSABLE;
317 return OK;
318 }
319 CallbackMap::iterator it = callback_map_.find(pipeline_id);
320 if (it == callback_map_.end()) {
321 return ERR_IO_PENDING;
322 }
323 DCHECK(ContainsKey(parser_map_, pipeline_id));
324
325 if (result == ERR_IO_PENDING) {
326 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_ACTIVE);
327 } else {
328 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_READ_PENDING);
329 stream_state_map_[pipeline_id] = STREAM_ACTIVE;
330 }
331
332 int rv = parser_map_[pipeline_id]->ReadResponseHeaders(&read_io_callback_);
333 if (rv == ERR_IO_PENDING) {
334 read_next_state_ = READ_STATE_COMPLETE;
335 read_user_callback_ = it->second;
336 } else if (rv < OK) {
337 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
338 if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
339 rv = ERR_PIPELINE_EVICTION;
340 } else {
341 DCHECK_LE(OK, rv);
342 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
343 }
344
345 // |result| == ERR_IO_PENDING means this function was *not* called on the same
346 // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
347 // ReadResponseHeaders() earlier and now need to invoke its callback.
348 if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
349 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
350 read_user_callback_ = it->second;
351 MessageLoop::current()->PostTask(
352 FROM_HERE,
353 method_factory_.NewRunnableMethod(
354 &HttpPipelinedConnectionImpl::FireReadUserCallback,
355 rv));
356 }
357 return rv;
358 }
359
360 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
361 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
362 if (read_user_callback_) {
363 MessageLoop::current()->PostTask(
364 FROM_HERE,
365 method_factory_.NewRunnableMethod(
366 &HttpPipelinedConnectionImpl::FireReadUserCallback,
367 result));
368 }
369 return result;
370 }
371
372 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
373 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
374 return result;
375 }
376
377 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
378 DCHECK(!request_order_.empty());
379 int pipeline_id = request_order_.front();
380 DCHECK(ContainsKey(callback_map_, pipeline_id));
381 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_CLOSED);
382 callback_map_.erase(pipeline_id);
383 request_order_.pop();
384 if (!usable_) {
385 read_next_state_ = READ_STATE_UNUSABLE;
386 return OK;
387 } else {
388 completed_one_request_ = true;
389 if (request_order_.empty() ||
390 !ContainsKey(callback_map_, request_order_.front())) {
391 read_next_state_ = READ_STATE_NONE;
392 return OK;
393 } else {
394 stream_state_map_[request_order_.front()] = STREAM_ACTIVE;
395 read_next_state_ = READ_STATE_NEXT_HEADERS;
396 MessageLoop::current()->PostTask(
397 FROM_HERE,
398 method_factory_.NewRunnableMethod(
399 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
400 ERR_IO_PENDING));
401 return ERR_IO_PENDING; // Wait for the task to fire.
402 }
403 }
404 }
405
406 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
407 while (!request_order_.empty()) {
408 int evicted_id = request_order_.front();
409 request_order_.pop();
410 CallbackMap::iterator cb_it = callback_map_.find(evicted_id);
411 if (cb_it == callback_map_.end()) {
412 continue;
413 }
414 if (stream_state_map_[evicted_id] != STREAM_CLOSED) {
415 cb_it->second->Run(ERR_PIPELINE_EVICTION);
416 }
417 callback_map_.erase(cb_it);
418 }
419 DCHECK(callback_map_.empty());
420 read_next_state_ = READ_STATE_NONE;
421 return result;
422 }
423
424 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
425 bool not_reusable) {
426 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
427 switch (stream_state_map_[pipeline_id]) {
428 case STREAM_CREATED:
429 stream_state_map_[pipeline_id] = STREAM_UNUSED;
430 break;
431
432 case STREAM_BOUND:
433 stream_state_map_[pipeline_id] = STREAM_CLOSED;
434 break;
435
436 case STREAM_SENDING:
437 usable_ = false;
438 stream_state_map_[pipeline_id] = STREAM_CLOSED;
439 send_user_callback_ = NULL;
440 send_next_state_ = SEND_STATE_UNUSABLE;
441 DoSendRequestLoop(OK);
442 break;
443
444 case STREAM_SENT:
445 case STREAM_READ_PENDING:
446 usable_ = false;
447 stream_state_map_[pipeline_id] = STREAM_CLOSED;
448 if (read_next_state_ == READ_STATE_NONE) {
449 read_next_state_ = READ_STATE_UNUSABLE;
450 DoReadHeadersLoop(OK);
451 }
452 break;
453
454 case STREAM_ACTIVE:
455 stream_state_map_[pipeline_id] = STREAM_CLOSED;
456 if (not_reusable) {
457 usable_ = false;
458 }
459 read_next_state_ = READ_STATE_STREAM_CLOSED;
460 read_user_callback_ = NULL;
461 DoReadHeadersLoop(OK);
462 break;
463
464 case STREAM_CLOSED:
465 case STREAM_UNUSED:
466 // TODO(simonjam): Why is Close() sometimes called twice?
467 break;
468
469 default:
470 NOTREACHED();
471 break;
472 }
473 }
474
475 int HttpPipelinedConnectionImpl::ReadResponseBody(
476 int pipeline_id,
477 IOBuffer* buf,
478 int buf_len,
479 CompletionCallback* callback) {
480 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
481 DCHECK(!request_order_.empty());
482 DCHECK(pipeline_id == request_order_.front());
483 DCHECK(ContainsKey(parser_map_, pipeline_id));
484 return parser_map_[pipeline_id]->ReadResponseBody(buf, buf_len, callback);
485 }
486
487 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
488 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
489 DCHECK(ContainsKey(parser_map_, pipeline_id));
490 return parser_map_.find(pipeline_id)->second->GetUploadProgress();
491 }
492
493 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
494 int pipeline_id) {
495 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
496 DCHECK(ContainsKey(parser_map_, pipeline_id));
497 return parser_map_[pipeline_id]->GetResponseInfo();
498 }
499
500 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
501 int pipeline_id) const {
502 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
503 DCHECK(ContainsKey(parser_map_, pipeline_id));
504 return parser_map_.find(pipeline_id)->second->IsResponseBodyComplete();
505 }
506
507 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
508 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
509 DCHECK(ContainsKey(parser_map_, pipeline_id));
510 return parser_map_.find(pipeline_id)->second->CanFindEndOfResponse();
511 }
512
513 bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const {
514 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
515 return read_buf_->offset();
516 }
517
518 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
519 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
520 if (pipeline_id > 1) {
521 return true;
522 }
523 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
524 return connection_->is_reused() ||
525 reuse_type == ClientSocketHandle::UNUSED_IDLE;
526 }
527
528 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
529 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
530 connection_->set_is_reused(true);
531 }
532
533 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
534 SSLInfo* ssl_info) {
535 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
536 DCHECK(ContainsKey(parser_map_, pipeline_id));
537 return parser_map_[pipeline_id]->GetSSLInfo(ssl_info);
538 }
539
540 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
541 int pipeline_id,
542 SSLCertRequestInfo* cert_request_info) {
543 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
544 DCHECK(ContainsKey(parser_map_, pipeline_id));
545 return parser_map_[pipeline_id]->GetSSLCertRequestInfo(cert_request_info);
546 }
547
548 void HttpPipelinedConnectionImpl::FireReadUserCallback(
549 int result) {
550 if (read_user_callback_) {
551 read_user_callback_->Run(result);
552 read_user_callback_ = NULL;
553 }
554 }
555
556 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698