Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Side by Side Diff: net/http/http_pipelined_connection.cc

Issue 7289006: Basic HTTP pipelining support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added about:flags entry Created 9 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/http/http_pipelined_connection.h"
6
7 #include "base/message_loop.h"
8 #include "base/stl_util-inl.h"
9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_host.h"
11 #include "net/http/http_pipelined_stream.h"
12 #include "net/http/http_request_info.h"
13 #include "net/http/http_stream_parser.h"
14 #include "net/socket/client_socket_handle.h"
15
16 namespace net {
17
18 HttpPipelinedConnection::HttpPipelinedConnection(
19 ClientSocketHandle* connection,
20 HttpPipelinedHost* host,
21 const SSLConfig& used_ssl_config,
22 const ProxyInfo& used_proxy_info,
23 const BoundNetLog& net_log,
24 bool was_npn_negotiated)
25 : host_(host),
26 connection_(connection),
27 used_ssl_config_(used_ssl_config),
28 used_proxy_info_(used_proxy_info),
29 net_log_(net_log),
30 was_npn_negotiated_(was_npn_negotiated),
31 read_buf_(new GrowableIOBuffer()),
32 next_pipeline_id_(1),
33 active_(false),
34 usable_(true),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnection::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnection::OnReadIOCallback)),
43 read_user_callback_(NULL) {
44 DCHECK(connection_.get());
45 }
46
47 HttpPipelinedConnection::~HttpPipelinedConnection() {
48 DCHECK(depth() == 0);
49 DCHECK(parser_map_.empty());
50 DCHECK(callback_map_.empty());
51 DCHECK(deferred_request_queue_.empty());
52 DCHECK(request_order_.empty());
53 DCHECK_EQ(send_next_state_, SEND_STATE_NONE);
54 DCHECK_EQ(read_next_state_, READ_STATE_NONE);
55 DCHECK(!send_user_callback_);
56 DCHECK(!read_user_callback_);
57 if (!usable_) {
58 connection_->socket()->Disconnect();
59 }
60 connection_->Reset();
61 }
62
63 int HttpPipelinedConnection::AddStream() {
64 int pipeline_id = next_pipeline_id_++;
65 DCHECK(pipeline_id);
66 stream_state_map_.insert(std::make_pair(pipeline_id, STREAM_CREATED));
67 return pipeline_id;
68 }
69
70 void HttpPipelinedConnection::InitializeParser(int pipeline_id,
71 const HttpRequestInfo* request,
72 const BoundNetLog& net_log) {
73 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
74 DCHECK(!ContainsKey(parser_map_, pipeline_id));
75 stream_state_map_[pipeline_id] = STREAM_BOUND;
76 HttpStreamParser* parser = new HttpStreamParser(connection_.get(),
77 request,
78 read_buf_.get(),
79 net_log);
80 parser_map_.insert(std::make_pair(pipeline_id, parser));
81 if (!active_) {
82 active_ = true;
83 MessageLoop::current()->PostTask(
84 FROM_HERE,
85 method_factory_.NewRunnableMethod(
86 &HttpPipelinedConnection::FillPipeline));
87 }
88 }
89
90 void HttpPipelinedConnection::FillPipeline() {
91 host_->OnPipelineHasCapacity(this);
92 }
93
94 void HttpPipelinedConnection::RemoveStream(int pipeline_id) {
95 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
96
97 if (stream_state_map_[pipeline_id] != STREAM_CREATED) {
98 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_CLOSED);
99 DCHECK(ContainsKey(parser_map_, pipeline_id));
100 DCHECK(!ContainsKey(callback_map_, pipeline_id));
101
102 stream_state_map_.erase(pipeline_id);
103 ParserMap::iterator it = parser_map_.find(pipeline_id);
104 delete it->second;
105 parser_map_.erase(it);
106 }
107
108 MessageLoop::current()->PostTask(
109 FROM_HERE,
110 method_factory_.NewRunnableMethod(
111 &HttpPipelinedConnection::FillPipeline));
112 }
113
114 int HttpPipelinedConnection::SendRequest(int pipeline_id,
115 const std::string& request_line,
116 const HttpRequestHeaders& headers,
117 UploadDataStream* request_body,
118 HttpResponseInfo* response,
119 CompletionCallback* callback) {
120 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
121 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_BOUND);
122 if (!usable_) {
123 return ERR_PIPELINE_EVICTION;
124 }
125
126 DeferredSendRequest deferred_request;
127 deferred_request.pipeline_id = pipeline_id;
128 deferred_request.request_line = request_line;
129 deferred_request.headers = headers;
130 deferred_request.request_body = request_body;
131 deferred_request.response = response;
132 deferred_request.callback = callback;
133 deferred_request_queue_.push(deferred_request);
134
135 if (send_next_state_ == SEND_STATE_NONE) {
136 send_next_state_ = SEND_STATE_NEXT_REQUEST;
137 return DoSendRequestLoop(OK);
138 } else {
139 return ERR_IO_PENDING;
140 }
141 }
142
143 int HttpPipelinedConnection::DoSendRequestLoop(int result) {
144 int rv = result;
145 do {
146 SendRequestState state = send_next_state_;
147 send_next_state_ = SEND_STATE_NONE;
148 switch (state) {
149 case SEND_STATE_NEXT_REQUEST:
150 rv = DoSendNextRequest(rv);
151 break;
152 case SEND_STATE_COMPLETE:
153 rv = DoSendComplete(rv);
154 break;
155 case SEND_STATE_UNUSABLE:
156 rv = DoEvictPendingSendRequests(rv);
157 break;
158 default:
159 NOTREACHED() << "bad send state";
160 rv = ERR_FAILED;
161 break;
162 }
163 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
164 return rv;
165 }
166
167 void HttpPipelinedConnection::OnSendIOCallback(int result) {
168 DCHECK(send_user_callback_);
169 DoSendRequestLoop(result);
170 }
171
172 int HttpPipelinedConnection::DoSendNextRequest(int result) {
173 DCHECK(!deferred_request_queue_.empty());
174 const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
175 DCHECK(ContainsKey(stream_state_map_, deferred_request.pipeline_id));
176 if (stream_state_map_[deferred_request.pipeline_id] == STREAM_CLOSED) {
177 deferred_request_queue_.pop();
178 if (deferred_request_queue_.empty()) {
179 send_next_state_ = SEND_STATE_NONE;
180 } else {
181 send_next_state_ = SEND_STATE_NEXT_REQUEST;
182 }
183 return OK;
184 }
185 DCHECK(ContainsKey(parser_map_, deferred_request.pipeline_id));
186 int rv = parser_map_[deferred_request.pipeline_id]->SendRequest(
187 deferred_request.request_line,
188 deferred_request.headers,
189 deferred_request.request_body,
190 deferred_request.response,
191 &send_io_callback_);
192 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
193 send_user_callback_ = deferred_request.callback;
194 }
195 request_order_.push(deferred_request.pipeline_id);
196 stream_state_map_[deferred_request.pipeline_id] = STREAM_SENT;
197 deferred_request_queue_.pop();
198 send_next_state_ = SEND_STATE_COMPLETE;
199 return rv;
200 }
201
202 int HttpPipelinedConnection::DoSendComplete(int result) {
203 if (send_user_callback_) {
204 CompletionCallback* callback = send_user_callback_;
205 send_user_callback_ = NULL;
206 callback->Run(result);
207 }
208 if (result != OK) {
209 send_next_state_ = SEND_STATE_UNUSABLE;
210 usable_ = false;
211 return result;
212 }
213 if (deferred_request_queue_.empty()) {
214 send_next_state_ = SEND_STATE_NONE;
215 } else {
216 send_next_state_ = SEND_STATE_NEXT_REQUEST;
217 MessageLoop::current()->PostTask(
218 FROM_HERE,
219 method_factory_.NewRunnableMethod(
220 &HttpPipelinedConnection::DoSendRequestLoop,
221 ERR_IO_PENDING));
222 }
223 return OK;
224 }
225
226 int HttpPipelinedConnection::DoEvictPendingSendRequests(int result) {
227 send_next_state_ = SEND_STATE_NONE;
228 while (!deferred_request_queue_.empty()) {
229 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
230 if (stream_state_map_[evicted_send.pipeline_id] == STREAM_CLOSED) {
mmenke 2011/08/03 21:09:39 This case will result in an infinite loop without
James Simonsen 2011/08/05 01:39:00 Done. Added a unit test too.
231 continue;
232 }
233 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
234 deferred_request_queue_.pop();
235 }
236 return result;
237 }
238
239 int HttpPipelinedConnection::ReadResponseHeaders(int pipeline_id,
240 CompletionCallback* callback) {
241 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
242 DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_SENT);
243 DCHECK(!ContainsKey(callback_map_, pipeline_id));
244 if (!usable_) {
245 return ERR_PIPELINE_EVICTION;
246 }
247 callback_map_[pipeline_id] = callback;
248 if (read_next_state_ == READ_STATE_NONE) {
249 read_next_state_ = READ_STATE_NEXT_HEADERS;
250 return DoReadHeadersLoop(OK);
251 } else {
252 return ERR_IO_PENDING;
253 }
254 }
255
256 int HttpPipelinedConnection::DoReadHeadersLoop(int result) {
257 int rv = result;
258 do {
259 ReadHeadersState state = read_next_state_;
260 read_next_state_ = READ_STATE_NONE;
261 switch (state) {
262 case READ_STATE_NEXT_HEADERS:
263 rv = DoReadNextHeaders(rv);
264 break;
265 case READ_STATE_COMPLETE:
266 rv = DoReadHeadersComplete(rv);
267 break;
268 case READ_STATE_STREAM_CLOSED:
269 rv = DoStreamClosed();
270 break;
271 case READ_STATE_UNUSABLE:
272 rv = DoEvictPendingReadHeaders(rv);
273 break;
274 default:
275 NOTREACHED() << "bad read state";
276 rv = ERR_FAILED;
277 break;
278 }
279 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
280 return rv;
281 }
282
283 void HttpPipelinedConnection::OnReadIOCallback(int result) {
284 DCHECK(read_user_callback_ != NULL);
285 DoReadHeadersLoop(result);
286 }
287
288 int HttpPipelinedConnection::DoReadNextHeaders(int result) {
289 DCHECK(!request_order_.empty());
290 int pipeline_id = request_order_.front();
291 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
292 if (stream_state_map_[pipeline_id] == STREAM_CLOSED) {
293 // Since nobody will read whatever data is on the pipeline associated with
294 // this request, we must shut down the rest of the pipeline.
295 read_next_state_ = READ_STATE_UNUSABLE;
296 return OK;
297 }
298 CallbackMap::iterator it = callback_map_.find(pipeline_id);
299 if (it == callback_map_.end()) {
300 return ERR_IO_PENDING;
301 }
302 DCHECK(ContainsKey(parser_map_, pipeline_id));
303 int rv = parser_map_[pipeline_id]->ReadResponseHeaders(
304 &read_io_callback_);
305 if (rv == ERR_IO_PENDING) {
306 read_next_state_ = READ_STATE_COMPLETE;
307 read_user_callback_ = it->second;
308 } else if (result == ERR_IO_PENDING) {
309 read_next_state_ = READ_STATE_STREAM_CLOSED;
310 MessageLoop::current()->PostTask(
311 FROM_HERE,
312 method_factory_.NewRunnableMethod(
313 &HttpPipelinedConnection::FireUserCallback,
314 it->second,
315 rv));
316 rv = ERR_IO_PENDING;
317 }
318 callback_map_.erase(it);
319 return rv;
320 }
321
322 int HttpPipelinedConnection::DoReadHeadersComplete(int result) {
323 read_next_state_ = READ_STATE_STREAM_CLOSED;
324 if (read_user_callback_) {
325 MessageLoop::current()->PostTask(
326 FROM_HERE,
327 method_factory_.NewRunnableMethod(
328 &HttpPipelinedConnection::FireUserCallback,
329 read_user_callback_,
330 result));
331 read_user_callback_ = NULL;
332 }
333 return ERR_IO_PENDING;
334 }
335
336 void HttpPipelinedConnection::FireUserCallback(CompletionCallback* callback,
337 int result) {
338 DCHECK(callback);
339 callback->Run(result);
340 }
341
342 int HttpPipelinedConnection::DoStreamClosed() {
343 DCHECK(!request_order_.empty());
344 request_order_.pop();
345 if (request_order_.empty()) {
346 read_next_state_ = READ_STATE_NONE;
347 return OK;
348 } else {
349 read_next_state_ = READ_STATE_NEXT_HEADERS;
350 MessageLoop::current()->PostTask(
351 FROM_HERE,
352 method_factory_.NewRunnableMethod(
353 &HttpPipelinedConnection::DoReadHeadersLoop,
354 ERR_IO_PENDING));
355 return ERR_IO_PENDING; // Wait for the task to fire.
356 }
357 }
358
359 int HttpPipelinedConnection::DoEvictPendingReadHeaders(int result) {
360 while (!request_order_.empty()) {
361 int evicted_id = request_order_.front();
362 request_order_.pop();
363 CallbackMap::iterator cb_it = callback_map_.find(evicted_id);
364 if (cb_it == callback_map_.end()) {
365 continue;
366 }
367 callback_map_.erase(cb_it);
368 if (stream_state_map_[evicted_id] == STREAM_CLOSED) {
369 continue;
370 }
371 cb_it->second->Run(ERR_PIPELINE_EVICTION);
372 }
373 DCHECK(callback_map_.empty());
374 read_next_state_ = READ_STATE_NONE;
375 return result;
376 }
377
378 void HttpPipelinedConnection::Close(int pipeline_id,
379 bool not_reusable) {
380 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
381 if (stream_state_map_[pipeline_id] == STREAM_CLOSED) {
382 // TODO(simonjam): Why is Close() sometimes called twice?
383 return;
384 }
385 stream_state_map_[pipeline_id] = STREAM_CLOSED;
386 // TODO(simonjam): If it hasn't sent yet, then we can just ignore it instead
387 // of shutting the pipeline down.
388 bool is_active_stream = !request_order_.empty() &&
389 pipeline_id == request_order_.front();
390 if (not_reusable || !is_active_stream) {
391 usable_ = false;
392 }
393 if (is_active_stream) {
394 read_next_state_ = READ_STATE_STREAM_CLOSED;
395 read_user_callback_ = NULL;
396 DoReadHeadersLoop(OK);
397 } else {
398 if (send_next_state_ == SEND_STATE_NONE) {
399 send_next_state_ = SEND_STATE_UNUSABLE;
400 DoSendRequestLoop(OK);
401 }
402 if (read_next_state_ == READ_STATE_NONE) {
403 read_next_state_ = READ_STATE_UNUSABLE;
404 DoReadHeadersLoop(OK);
405 }
406 }
407 }
408
409 int HttpPipelinedConnection::ReadResponseBody(int pipeline_id,
410 IOBuffer* buf, int buf_len,
411 CompletionCallback* callback) {
412 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
413 DCHECK(!request_order_.empty());
414 DCHECK(pipeline_id == request_order_.front());
415 DCHECK(ContainsKey(parser_map_, pipeline_id));
416 return parser_map_[pipeline_id]->ReadResponseBody(buf, buf_len, callback);
417 }
418
419 uint64 HttpPipelinedConnection::GetUploadProgress(int pipeline_id) const {
420 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
421 DCHECK(ContainsKey(parser_map_, pipeline_id));
422 return parser_map_.find(pipeline_id)->second->GetUploadProgress();
423 }
424
425 HttpResponseInfo* HttpPipelinedConnection::GetResponseInfo(int pipeline_id) {
426 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
427 DCHECK(ContainsKey(parser_map_, pipeline_id));
428 return parser_map_[pipeline_id]->GetResponseInfo();
429 }
430
431 bool HttpPipelinedConnection::IsResponseBodyComplete(int pipeline_id) const {
432 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
433 DCHECK(ContainsKey(parser_map_, pipeline_id));
434 return parser_map_.find(pipeline_id)->second->IsResponseBodyComplete();
435 }
436
437 bool HttpPipelinedConnection::CanFindEndOfResponse(int pipeline_id) const {
438 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
439 DCHECK(ContainsKey(parser_map_, pipeline_id));
440 return parser_map_.find(pipeline_id)->second->CanFindEndOfResponse();
441 }
442
443 bool HttpPipelinedConnection::IsMoreDataBuffered(int pipeline_id) const {
444 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
445 return read_buf_->offset();
446 }
447
448 bool HttpPipelinedConnection::IsConnectionReused(int pipeline_id) const {
449 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
450 if (pipeline_id > 1) {
451 return true;
452 }
453 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
454 return connection_->is_reused() ||
455 reuse_type == ClientSocketHandle::UNUSED_IDLE;
456 }
457
458 void HttpPipelinedConnection::SetConnectionReused(int pipeline_id) {
459 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
460 connection_->set_is_reused(true);
461 }
462
463 void HttpPipelinedConnection::GetSSLInfo(int pipeline_id,
464 SSLInfo* ssl_info) {
465 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
466 DCHECK(ContainsKey(parser_map_, pipeline_id));
467 return parser_map_[pipeline_id]->GetSSLInfo(ssl_info);
468 }
469
470 void HttpPipelinedConnection::GetSSLCertRequestInfo(
471 int pipeline_id,
472 SSLCertRequestInfo* cert_request_info) {
473 DCHECK(ContainsKey(stream_state_map_, pipeline_id));
474 DCHECK(ContainsKey(parser_map_, pipeline_id));
475 return parser_map_[pipeline_id]->GetSSLCertRequestInfo(cert_request_info);
476 }
477
478 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698