Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 7289006: Basic HTTP pipelining support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added unit tests Created 9 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_pipelined_connection_impl.h"
6
7 #include "base/message_loop.h"
8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_host.h"
mmenke 2011/08/23 19:05:25 nit: No longer needed.
James Simonsen 2011/08/26 22:19:07 Done.
11 #include "net/http/http_pipelined_stream.h"
12 #include "net/http/http_request_info.h"
13 #include "net/http/http_stream_parser.h"
14 #include "net/socket/client_socket_handle.h"
15
16 namespace net {
17
18 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
19 ClientSocketHandle* connection,
20 HttpPipelinedConnection::Owner* owner,
21 const SSLConfig& used_ssl_config,
22 const ProxyInfo& used_proxy_info,
23 const BoundNetLog& net_log,
24 bool was_npn_negotiated)
25 : owner_(owner),
26 connection_(connection),
27 used_ssl_config_(used_ssl_config),
28 used_proxy_info_(used_proxy_info),
29 net_log_(net_log),
30 was_npn_negotiated_(was_npn_negotiated),
31 read_buf_(new GrowableIOBuffer()),
32 next_pipeline_id_(1),
33 active_(false),
34 usable_(true),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
43 read_user_callback_(NULL) {
44 DCHECK(connection_.get());
45 }
46
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 DCHECK(depth() == 0);
49 DCHECK(parser_map_.empty());
50 DCHECK(callback_map_.empty());
51 DCHECK(deferred_request_queue_.empty());
52 DCHECK(request_order_.empty());
53 DCHECK_EQ(send_next_state_, SEND_STATE_NONE);
54 DCHECK_EQ(read_next_state_, READ_STATE_NONE);
55 DCHECK(!send_user_callback_);
56 DCHECK(!read_user_callback_);
57 if (!usable_) {
58 connection_->socket()->Disconnect();
59 }
60 connection_->Reset();
61 }
62
63 HttpStream* HttpPipelinedConnectionImpl::CreateNewStream() {
64 int pipeline_id = next_pipeline_id_++;
65 DCHECK(pipeline_id);
66 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
67 stream_state_map_.insert(std::make_pair(pipeline_id, STREAM_CREATED));
68 return stream;
69 }
70
71 void HttpPipelinedConnectionImpl::InitializeParser(
72 int pipeline_id,
73 const HttpRequestInfo* request,
74 const BoundNetLog& net_log) {
75 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
76 DCHECK(!ContainsKey(parser_map_, pipeline_id));
77 stream_state_map_[pipeline_id] = STREAM_BOUND;
78 HttpStreamParser* parser = new HttpStreamParser(connection_.get(),
79 request,
80 read_buf_.get(),
81 net_log);
82 parser_map_.insert(std::make_pair(pipeline_id, parser));
83 if (!active_) {
84 active_ = true;
85 MessageLoop::current()->PostTask(
86 FROM_HERE,
87 method_factory_.NewRunnableMethod(
88 &HttpPipelinedConnectionImpl::FillPipeline));
89 }
90 }
91
92 void HttpPipelinedConnectionImpl::FillPipeline() {
93 owner_->OnPipelineHasCapacity(this);
94 }
95
96 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
97 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
98
99 if (stream_state_map_[pipeline_id] != STREAM_CREATED &&
100 stream_state_map_[pipeline_id] != STREAM_UNUSED) {
101 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_CLOSED);
mmenke 2011/08/23 19:05:25 A BOUND_STREAM can currently end up here on ERR_PI
James Simonsen 2011/08/26 22:19:07 Good catch! Hmm. I'd actually prefer that Close()
102 DCHECK(ContainsKey(parser_map_, pipeline_id));
103 DCHECK(!ContainsKey(callback_map_, pipeline_id));
104
105 ParserMap::iterator it = parser_map_.find(pipeline_id);
106 delete it->second;
107 parser_map_.erase(it);
108 }
109 stream_state_map_.erase(pipeline_id);
110
111 MessageLoop::current()->PostTask(
112 FROM_HERE,
113 method_factory_.NewRunnableMethod(
114 &HttpPipelinedConnectionImpl::FillPipeline));
115 }
116
117 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
118 const std::string& request_line,
119 const HttpRequestHeaders& headers,
120 UploadDataStream* request_body,
121 HttpResponseInfo* response,
122 CompletionCallback* callback) {
123 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
124 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_BOUND);
125 if (!usable_) {
126 return ERR_PIPELINE_EVICTION;
127 }
128
129 DeferredSendRequest deferred_request;
130 deferred_request.pipeline_id = pipeline_id;
131 deferred_request.request_line = request_line;
132 deferred_request.headers = headers;
133 deferred_request.request_body = request_body;
134 deferred_request.response = response;
135 deferred_request.callback = callback;
136 deferred_request_queue_.push(deferred_request);
137
138 if (send_next_state_ == SEND_STATE_NONE) {
139 send_next_state_ = SEND_STATE_NEXT_REQUEST;
140 return DoSendRequestLoop(OK);
141 } else {
142 return ERR_IO_PENDING;
143 }
144 }
145
146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
147 int rv = result;
148 do {
149 SendRequestState state = send_next_state_;
150 send_next_state_ = SEND_STATE_NONE;
151 switch (state) {
152 case SEND_STATE_NEXT_REQUEST:
153 rv = DoSendNextRequest(rv);
154 break;
155 case SEND_STATE_COMPLETE:
156 rv = DoSendComplete(rv);
157 break;
158 case SEND_STATE_UNUSABLE:
159 rv = DoEvictPendingSendRequests(rv);
160 break;
161 default:
162 NOTREACHED() << "bad send state: " << state;
163 rv = ERR_FAILED;
164 break;
165 }
166 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
167 return rv;
168 }
169
170 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
171 DCHECK(send_user_callback_);
172 DoSendRequestLoop(result);
173 }
174
175 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
176 DCHECK(!deferred_request_queue_.empty());
177 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
178 DCHECK(ContainsKey(stream_state_map_, deferred_request.pipeline_id));
179 if (stream_state_map_[deferred_request.pipeline_id] == STREAM_CLOSED) {
180 deferred_request_queue_.pop();
181 if (deferred_request_queue_.empty()) {
182 send_next_state_ = SEND_STATE_NONE;
183 } else {
184 send_next_state_ = SEND_STATE_NEXT_REQUEST;
185 }
186 return OK;
187 }
188 DCHECK(ContainsKey(parser_map_, deferred_request.pipeline_id));
189 int rv = parser_map_[deferred_request.pipeline_id]->SendRequest(
190 deferred_request.request_line,
191 deferred_request.headers,
192 deferred_request.request_body,
193 deferred_request.response,
194 &send_io_callback_);
195 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
mmenke 2011/08/23 19:05:25 nit: Add a comment that |result| == ERR_IO_PENDIN
James Simonsen 2011/08/26 22:19:07 Done.
196 send_user_callback_ = deferred_request.callback;
197 }
198 request_order_.push(deferred_request.pipeline_id);
199 stream_state_map_[deferred_request.pipeline_id] = STREAM_SENT;
200 deferred_request_queue_.pop();
201 send_next_state_ = SEND_STATE_COMPLETE;
202 return rv;
203 }
204
205 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
206 if (send_user_callback_) {
207 CompletionCallback* callback = send_user_callback_;
208 send_user_callback_ = NULL;
209 callback->Run(result);
210 }
211 if (result < OK) {
mmenke 2011/08/23 19:15:25 On ERR_SOCKET_NOT_CONNECTED, we might want to swit
James Simonsen 2011/08/26 22:19:07 Yeah, good idea. Done. And tested.
212 send_next_state_ = SEND_STATE_UNUSABLE;
213 usable_ = false;
214 return result;
215 }
216 if (deferred_request_queue_.empty()) {
217 send_next_state_ = SEND_STATE_NONE;
218 return OK;
219 } else {
220 send_next_state_ = SEND_STATE_NEXT_REQUEST;
221 MessageLoop::current()->PostTask(
222 FROM_HERE,
223 method_factory_.NewRunnableMethod(
224 &HttpPipelinedConnectionImpl::DoSendRequestLoop,
225 ERR_IO_PENDING));
226 return ERR_IO_PENDING; // Wait for the task to fire.
227 }
228 }
229
230 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
231 send_next_state_ = SEND_STATE_NONE;
232 while (!deferred_request_queue_.empty()) {
233 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
234 if (stream_state_map_[evicted_send.pipeline_id] != STREAM_CLOSED) {
235 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
236 }
237 deferred_request_queue_.pop();
238 }
239 return result;
240 }
241
242 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
243 int pipeline_id,
244 CompletionCallback* callback) {
245 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
246 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_SENT);
247 DCHECK(!ContainsKey(callback_map_, pipeline_id));
248 if (!usable_) {
249 return ERR_PIPELINE_EVICTION;
250 }
251 callback_map_[pipeline_id] = callback;
252 if (read_next_state_ == READ_STATE_NONE) {
253 read_next_state_ = READ_STATE_NEXT_HEADERS;
254 return DoReadHeadersLoop(OK);
255 } else {
256 return ERR_IO_PENDING;
257 }
258 }
259
260 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
261 int rv = result;
262 do {
263 ReadHeadersState state = read_next_state_;
264 read_next_state_ = READ_STATE_NONE;
265 switch (state) {
266 case READ_STATE_NEXT_HEADERS:
267 rv = DoReadNextHeaders(rv);
268 break;
269 case READ_STATE_COMPLETE:
270 rv = DoReadHeadersComplete(rv);
271 break;
272 case READ_STATE_WAITING_FOR_CLOSE:
273 rv = DoReadWaitingForClose(rv);
274 return rv;
275 case READ_STATE_STREAM_CLOSED:
276 rv = DoReadStreamClosed();
277 break;
278 case READ_STATE_UNUSABLE:
279 rv = DoEvictPendingReadHeaders(rv);
280 break;
281 default:
282 NOTREACHED() << "bad read state";
283 rv = ERR_FAILED;
284 break;
285 }
286 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
287 return rv;
288 }
289
290 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
291 DCHECK(read_user_callback_ != NULL);
292 DoReadHeadersLoop(result);
293 }
294
295 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
296 DCHECK(!request_order_.empty());
297 int pipeline_id = request_order_.front();
298 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
299 if (stream_state_map_[pipeline_id] == STREAM_CLOSED) {
300 // Since nobody will read whatever data is on the pipeline associated with
301 // this request, we must shut down the rest of the pipeline.
302 read_next_state_ = READ_STATE_UNUSABLE;
303 return OK;
304 }
305 CallbackMap::iterator it = callback_map_.find(pipeline_id);
306 if (it == callback_map_.end()) {
307 return ERR_IO_PENDING;
308 }
309 DCHECK(ContainsKey(parser_map_, pipeline_id));
310 int rv = parser_map_[pipeline_id]->ReadResponseHeaders(
311 &read_io_callback_);
312 if (rv == ERR_IO_PENDING) {
313 read_next_state_ = READ_STATE_COMPLETE;
314 read_user_callback_ = it->second;
315 } else if (result == ERR_IO_PENDING) {
316 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
317 MessageLoop::current()->PostTask(
318 FROM_HERE,
319 method_factory_.NewRunnableMethod(
320 &HttpPipelinedConnectionImpl::FireUserCallback,
321 it->second,
322 rv));
mmenke 2011/08/23 19:05:25 Is there any guarantee that a stream won't be dele
James Simonsen 2011/08/26 22:19:07 Another good catch! Fixed and added tests.
323 } else if (rv < OK) {
324 read_next_state_ = READ_STATE_UNUSABLE;
325 } else {
326 DCHECK_LE(OK, rv);
327 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
328 }
329 callback_map_.erase(it);
330 return rv;
331 }
332
333 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
334 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
335 if (read_user_callback_) {
336 MessageLoop::current()->PostTask(
337 FROM_HERE,
338 method_factory_.NewRunnableMethod(
339 &HttpPipelinedConnectionImpl::FireUserCallback,
340 read_user_callback_,
341 result));
342 read_user_callback_ = NULL;
343 }
344 return result;
345 }
346
347 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
348 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
349 return result;
350 }
351
352 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
353 DCHECK(!request_order_.empty());
354 request_order_.pop();
355 if (!usable_) {
356 read_next_state_ = READ_STATE_UNUSABLE;
357 return OK;
358 } else if (request_order_.empty() ||
359 !ContainsKey(callback_map_, request_order_.front())) {
360 read_next_state_ = READ_STATE_NONE;
361 return OK;
362 } else {
363 read_next_state_ = READ_STATE_NEXT_HEADERS;
364 MessageLoop::current()->PostTask(
365 FROM_HERE,
366 method_factory_.NewRunnableMethod(
367 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
368 ERR_IO_PENDING));
369 return ERR_IO_PENDING; // Wait for the task to fire.
370 }
371 }
372
373 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
374 while (!request_order_.empty()) {
375 int evicted_id = request_order_.front();
376 request_order_.pop();
377 CallbackMap::iterator cb_it = callback_map_.find(evicted_id);
378 if (cb_it == callback_map_.end()) {
379 continue;
380 }
381 if (stream_state_map_[evicted_id] != STREAM_CLOSED) {
382 cb_it->second->Run(ERR_PIPELINE_EVICTION);
383 }
384 callback_map_.erase(cb_it);
385 }
386 DCHECK(callback_map_.empty());
387 read_next_state_ = READ_STATE_NONE;
388 return result;
389 }
390
391 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
392 bool not_reusable) {
393 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
394 switch (stream_state_map_[pipeline_id]) {
395 case STREAM_CLOSED:
396 // TODO(simonjam): Why is Close() sometimes called twice?
397 return;
398
399 case STREAM_CREATED:
400 stream_state_map_[pipeline_id] = STREAM_UNUSED;
401 return;
402
403 case STREAM_BOUND:
404 stream_state_map_[pipeline_id] = STREAM_CLOSED;
405 return;
406
407 case STREAM_SENT:
408 break;
409
410 default:
411 NOTREACHED();
412 break;
413 }
414
415 stream_state_map_[pipeline_id] = STREAM_CLOSED;
416 bool is_active_stream = !request_order_.empty() &&
417 pipeline_id == request_order_.front();
418 if (not_reusable || !is_active_stream) {
419 usable_ = false;
420 }
421 if (is_active_stream) {
422 read_next_state_ = READ_STATE_STREAM_CLOSED;
423 read_user_callback_ = NULL;
424 DoReadHeadersLoop(OK);
425 } else {
426 if (send_next_state_ == SEND_STATE_NONE) {
427 send_next_state_ = SEND_STATE_UNUSABLE;
428 DoSendRequestLoop(OK);
429 }
430 if (read_next_state_ == READ_STATE_NONE) {
431 read_next_state_ = READ_STATE_UNUSABLE;
432 DoReadHeadersLoop(OK);
433 }
434 }
435 }
436
437 int HttpPipelinedConnectionImpl::ReadResponseBody(
438 int pipeline_id,
439 IOBuffer* buf,
440 int buf_len,
441 CompletionCallback* callback) {
442 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
443 DCHECK(!request_order_.empty());
444 DCHECK(pipeline_id == request_order_.front());
mmenke 2011/08/23 20:35:12 On destruction, an HttpNetworkTransactionuses a Ht
James Simonsen 2011/08/26 22:19:07 Nice. I liked Will's comment and moved it to the s
445 DCHECK(ContainsKey(parser_map_, pipeline_id));
446 return parser_map_[pipeline_id]->ReadResponseBody(buf, buf_len, callback);
447 }
448
449 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
450 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
451 DCHECK(ContainsKey(parser_map_, pipeline_id));
452 return parser_map_.find(pipeline_id)->second->GetUploadProgress();
453 }
454
455 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
456 int pipeline_id) {
457 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
458 DCHECK(ContainsKey(parser_map_, pipeline_id));
459 return parser_map_[pipeline_id]->GetResponseInfo();
460 }
461
462 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
463 int pipeline_id) const {
464 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
465 DCHECK(ContainsKey(parser_map_, pipeline_id));
466 return parser_map_.find(pipeline_id)->second->IsResponseBodyComplete();
467 }
468
469 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
470 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
471 DCHECK(ContainsKey(parser_map_, pipeline_id));
472 return parser_map_.find(pipeline_id)->second->CanFindEndOfResponse();
473 }
474
475 bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const {
476 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
477 return read_buf_->offset();
478 }
479
480 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
481 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
482 if (pipeline_id > 1) {
483 return true;
484 }
485 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
486 return connection_->is_reused() ||
487 reuse_type == ClientSocketHandle::UNUSED_IDLE;
488 }
489
490 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
491 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
492 connection_->set_is_reused(true);
493 }
494
495 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
496 SSLInfo* ssl_info) {
497 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
498 DCHECK(ContainsKey(parser_map_, pipeline_id));
499 return parser_map_[pipeline_id]->GetSSLInfo(ssl_info);
500 }
501
502 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
503 int pipeline_id,
504 SSLCertRequestInfo* cert_request_info) {
505 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
506 DCHECK(ContainsKey(parser_map_, pipeline_id));
507 return parser_map_[pipeline_id]->GetSSLCertRequestInfo(cert_request_info);
508 }
509
510 void HttpPipelinedConnectionImpl::FireUserCallback(CompletionCallback* callback,
511 int result) {
512 DCHECK(callback);
513 callback->Run(result);
514 }
515
516 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698