Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 8515020: Refactor state machines in HttpPipelinedConnectionImpl. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add comments Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_pipelined_connection_impl.h" 5 #include "net/http/http_pipelined_connection_impl.h"
6 6
7 #include "base/message_loop.h" 7 #include "base/message_loop.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h" 9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_stream.h" 10 #include "net/http/http_pipelined_stream.h"
(...skipping 16 matching lines...) Expand all
27 used_proxy_info_(used_proxy_info), 27 used_proxy_info_(used_proxy_info),
28 net_log_(net_log), 28 net_log_(net_log),
29 was_npn_negotiated_(was_npn_negotiated), 29 was_npn_negotiated_(was_npn_negotiated),
30 read_buf_(new GrowableIOBuffer()), 30 read_buf_(new GrowableIOBuffer()),
31 next_pipeline_id_(1), 31 next_pipeline_id_(1),
32 active_(false), 32 active_(false),
33 usable_(true), 33 usable_(true),
34 completed_one_request_(false), 34 completed_one_request_(false),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), 35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE), 36 send_next_state_(SEND_STATE_NONE),
37 send_still_on_call_stack_(false),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( 38 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), 39 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE), 40 read_next_state_(READ_STATE_NONE),
41 active_read_id_(0),
42 read_still_on_call_stack_(false),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( 43 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), 44 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
43 read_user_callback_(NULL) {
44 CHECK(connection_.get()); 45 CHECK(connection_.get());
45 } 46 }
46 47
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { 48 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 CHECK_EQ(depth(), 0); 49 CHECK_EQ(depth(), 0);
49 CHECK(stream_info_map_.empty()); 50 CHECK(stream_info_map_.empty());
50 CHECK(deferred_request_queue_.empty()); 51 CHECK(pending_send_request_queue_.empty());
51 CHECK(request_order_.empty()); 52 CHECK(request_order_.empty());
52 CHECK_EQ(send_next_state_, SEND_STATE_NONE); 53 CHECK_EQ(send_next_state_, SEND_STATE_NONE);
53 CHECK_EQ(read_next_state_, READ_STATE_NONE); 54 CHECK_EQ(read_next_state_, READ_STATE_NONE);
54 CHECK(!send_user_callback_); 55 CHECK(!active_send_request_.get());
55 CHECK(!read_user_callback_); 56 CHECK(!active_read_id_);
56 if (!usable_) { 57 if (!usable_) {
57 connection_->socket()->Disconnect(); 58 connection_->socket()->Disconnect();
58 } 59 }
59 connection_->Reset(); 60 connection_->Reset();
60 } 61 }
61 62
62 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() { 63 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
63 int pipeline_id = next_pipeline_id_++; 64 int pipeline_id = next_pipeline_id_++;
64 CHECK(pipeline_id); 65 CHECK(pipeline_id);
65 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); 66 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 99 CHECK(ContainsKey(stream_info_map_, pipeline_id));
99 Close(pipeline_id, false); 100 Close(pipeline_id, false);
100 101
101 if (stream_info_map_[pipeline_id].state != STREAM_CREATED && 102 if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
102 stream_info_map_[pipeline_id].state != STREAM_UNUSED) { 103 stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
103 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 104 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
104 CHECK(stream_info_map_[pipeline_id].parser.get()); 105 CHECK(stream_info_map_[pipeline_id].parser.get());
105 stream_info_map_[pipeline_id].parser.reset(); 106 stream_info_map_[pipeline_id].parser.reset();
106 } 107 }
107 CHECK(!stream_info_map_[pipeline_id].parser.get()); 108 CHECK(!stream_info_map_[pipeline_id].parser.get());
108 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
109 stream_info_map_.erase(pipeline_id); 109 stream_info_map_.erase(pipeline_id);
110 110
111 delegate_->OnPipelineHasCapacity(this); 111 delegate_->OnPipelineHasCapacity(this);
112 } 112 }
113 113
114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, 114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
115 const std::string& request_line, 115 const std::string& request_line,
116 const HttpRequestHeaders& headers, 116 const HttpRequestHeaders& headers,
117 UploadDataStream* request_body, 117 UploadDataStream* request_body,
118 HttpResponseInfo* response, 118 HttpResponseInfo* response,
119 OldCompletionCallback* callback) { 119 OldCompletionCallback* callback) {
120 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 120 CHECK(ContainsKey(stream_info_map_, pipeline_id));
121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND); 121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
122 if (!usable_) { 122 if (!usable_) {
123 return ERR_PIPELINE_EVICTION; 123 return ERR_PIPELINE_EVICTION;
124 } 124 }
125 125
126 DeferredSendRequest deferred_request; 126 PendingSendRequest* send_request = new PendingSendRequest;
127 deferred_request.pipeline_id = pipeline_id; 127 send_request->pipeline_id = pipeline_id;
128 deferred_request.request_line = request_line; 128 send_request->request_line = request_line;
129 deferred_request.headers = headers; 129 send_request->headers = headers;
130 deferred_request.request_body = request_body; 130 send_request->request_body = request_body;
131 deferred_request.response = response; 131 send_request->response = response;
132 deferred_request.callback = callback; 132 send_request->callback = callback;
133 deferred_request_queue_.push(deferred_request); 133 pending_send_request_queue_.push(send_request);
134 134
135 int rv; 135 int rv;
136 if (send_next_state_ == SEND_STATE_NONE) { 136 if (send_next_state_ == SEND_STATE_NONE) {
137 send_next_state_ = SEND_STATE_NEXT_REQUEST; 137 send_next_state_ = SEND_STATE_START_IMMEDIATELY;
138 rv = DoSendRequestLoop(OK); 138 rv = DoSendRequestLoop(OK);
139 } else { 139 } else {
140 rv = ERR_IO_PENDING; 140 rv = ERR_IO_PENDING;
141 } 141 }
142 ActivatePipeline(); 142 ActivatePipeline();
143 return rv; 143 return rv;
144 } 144 }
145 145
146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { 146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
147 int rv = result; 147 int rv = result;
148 do { 148 do {
149 SendRequestState state = send_next_state_; 149 SendRequestState state = send_next_state_;
150 send_next_state_ = SEND_STATE_NONE; 150 send_next_state_ = SEND_STATE_NONE;
151 switch (state) { 151 switch (state) {
152 case SEND_STATE_NEXT_REQUEST: 152 case SEND_STATE_START_IMMEDIATELY:
153 rv = DoSendNextRequest(rv); 153 rv = DoStartRequestImmediately(rv);
154 break;
155 case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
156 rv = DoStartNextDeferredRequest(rv);
157 break;
158 case SEND_STATE_SEND_ACTIVE_REQUEST:
159 rv = DoSendActiveRequest(rv);
154 break; 160 break;
155 case SEND_STATE_COMPLETE: 161 case SEND_STATE_COMPLETE:
156 rv = DoSendComplete(rv); 162 rv = DoSendComplete(rv);
157 break; 163 break;
158 case SEND_STATE_UNUSABLE: 164 case SEND_STATE_EVICT_PENDING_REQUESTS:
159 rv = DoEvictPendingSendRequests(rv); 165 rv = DoEvictPendingSendRequests(rv);
160 break; 166 break;
161 default: 167 default:
162 NOTREACHED() << "bad send state: " << state; 168 NOTREACHED() << "bad send state: " << state;
163 rv = ERR_FAILED; 169 rv = ERR_FAILED;
164 break; 170 break;
165 } 171 }
166 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); 172 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
173 send_still_on_call_stack_ = false;
167 return rv; 174 return rv;
168 } 175 }
169 176
170 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { 177 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
171 CHECK(send_user_callback_); 178 CHECK(active_send_request_.get());
172 DoSendRequestLoop(result); 179 DoSendRequestLoop(result);
173 } 180 }
174 181
175 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { 182 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
176 CHECK(!deferred_request_queue_.empty()); 183 CHECK(!active_send_request_.get());
177 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 184 CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
178 CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id)); 185 send_still_on_call_stack_ = true;
179 if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) { 186 active_send_request_.reset(pending_send_request_queue_.front());
180 deferred_request_queue_.pop(); 187 pending_send_request_queue_.pop();
181 if (deferred_request_queue_.empty()) { 188 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
182 send_next_state_ = SEND_STATE_NONE; 189 return OK;
183 } else { 190 }
184 send_next_state_ = SEND_STATE_NEXT_REQUEST; 191
192 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
193 CHECK(!send_still_on_call_stack_);
194 CHECK(!active_send_request_.get());
195
196 while (!pending_send_request_queue_.empty()) {
197 scoped_ptr<PendingSendRequest> next_request(
198 pending_send_request_queue_.front());
199 pending_send_request_queue_.pop();
200 CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
201 if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
202 active_send_request_.reset(next_request.release());
203 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
204 return OK;
185 } 205 }
186 return OK;
187 } 206 }
188 CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get()); 207
189 int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest( 208 send_next_state_ = SEND_STATE_NONE;
190 deferred_request.request_line, 209 return OK;
191 deferred_request.headers, 210 }
192 deferred_request.request_body, 211
193 deferred_request.response, 212 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
194 &send_io_callback_); 213 CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
195 // |result| == ERR_IO_PENDING means this function was *not* called on the same 214 int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
196 // stack as SendRequest(). That means we returned ERR_IO_PENDING to 215 SendRequest(active_send_request_->request_line,
197 // SendRequest() earlier and will need to invoke its callback. 216 active_send_request_->headers,
198 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { 217 active_send_request_->request_body,
199 send_user_callback_ = deferred_request.callback; 218 active_send_request_->response,
200 } 219 &send_io_callback_);
201 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING; 220 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
202 send_next_state_ = SEND_STATE_COMPLETE; 221 send_next_state_ = SEND_STATE_COMPLETE;
203 return rv; 222 return rv;
204 } 223 }
205 224
206 int HttpPipelinedConnectionImpl::DoSendComplete(int result) { 225 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
207 CHECK(!deferred_request_queue_.empty()); 226 CHECK(active_send_request_.get());
208 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 227 CHECK_EQ(STREAM_SENDING,
209 CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state, 228 stream_info_map_[active_send_request_->pipeline_id].state);
210 STREAM_SENDING); 229
211 request_order_.push(deferred_request.pipeline_id); 230 request_order_.push(active_send_request_->pipeline_id);
212 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT; 231 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
213 deferred_request_queue_.pop(); 232
214 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { 233 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
215 result = ERR_PIPELINE_EVICTION; 234 result = ERR_PIPELINE_EVICTION;
216 } 235 }
217 if (result < OK) { 236 if (result < OK) {
218 send_next_state_ = SEND_STATE_UNUSABLE;
219 usable_ = false; 237 usable_ = false;
220 } 238 }
221 if (send_user_callback_) { 239
222 MessageLoop::current()->PostTask( 240 if (!send_still_on_call_stack_) {
223 FROM_HERE, 241 QueueUserCallback(active_send_request_->pipeline_id,
224 method_factory_.NewRunnableMethod( 242 active_send_request_->callback, result, FROM_HERE);
225 &HttpPipelinedConnectionImpl::FireUserCallback,
226 deferred_request.pipeline_id,
227 result));
228 stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
229 send_user_callback_;
230 send_user_callback_ = NULL;
231 } 243 }
232 if (result < OK) { 244
233 return result; 245 active_send_request_.reset();
246
247 if (send_still_on_call_stack_) {
248 // It should be impossible for another request to appear on the queue while
249 // this send was on the call stack.
250 CHECK(pending_send_request_queue_.empty());
251 send_next_state_ = SEND_STATE_NONE;
252 } else if (!usable_) {
253 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
254 } else {
255 send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
234 } 256 }
235 if (deferred_request_queue_.empty()) { 257
236 send_next_state_ = SEND_STATE_NONE; 258 return result;
237 return OK;
238 }
239 send_next_state_ = SEND_STATE_NEXT_REQUEST;
240 return OK;
241 } 259 }
242 260
243 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { 261 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
262 while (!pending_send_request_queue_.empty()) {
263 scoped_ptr<PendingSendRequest> evicted_send(
264 pending_send_request_queue_.front());
265 pending_send_request_queue_.pop();
266 if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
267 evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
268 }
269 }
244 send_next_state_ = SEND_STATE_NONE; 270 send_next_state_ = SEND_STATE_NONE;
245 while (!deferred_request_queue_.empty()) {
246 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
247 if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
248 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
249 }
250 deferred_request_queue_.pop();
251 }
252 return result; 271 return result;
253 } 272 }
254 273
255 int HttpPipelinedConnectionImpl::ReadResponseHeaders( 274 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
256 int pipeline_id, 275 int pipeline_id,
257 OldCompletionCallback* callback) { 276 OldCompletionCallback* callback) {
258 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 277 CHECK(ContainsKey(stream_info_map_, pipeline_id));
259 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT); 278 CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
260 CHECK(!stream_info_map_[pipeline_id].read_headers_callback); 279 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
280
261 if (!usable_) { 281 if (!usable_) {
262 return ERR_PIPELINE_EVICTION; 282 return ERR_PIPELINE_EVICTION;
263 } 283 }
284
264 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; 285 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
265 stream_info_map_[pipeline_id].read_headers_callback = callback; 286 stream_info_map_[pipeline_id].read_headers_callback = callback;
287 if (read_next_state_ == READ_STATE_NONE &&
288 pipeline_id == request_order_.front()) {
289 read_next_state_ = READ_STATE_START_IMMEDIATELY;
290 return DoReadHeadersLoop(OK);
291 }
292 return ERR_IO_PENDING;
293 }
294
295 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
266 if (read_next_state_ == READ_STATE_NONE) { 296 if (read_next_state_ == READ_STATE_NONE) {
267 read_next_state_ = READ_STATE_NEXT_HEADERS; 297 read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
268 return DoReadHeadersLoop(OK); 298 DoReadHeadersLoop(OK);
269 } else {
270 return ERR_IO_PENDING;
271 } 299 }
272 } 300 }
273 301
274 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { 302 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
275 int rv = result; 303 int rv = result;
276 do { 304 do {
277 ReadHeadersState state = read_next_state_; 305 ReadHeadersState state = read_next_state_;
278 read_next_state_ = READ_STATE_NONE; 306 read_next_state_ = READ_STATE_NONE;
279 switch (state) { 307 switch (state) {
280 case READ_STATE_NEXT_HEADERS: 308 case READ_STATE_START_IMMEDIATELY:
281 rv = DoReadNextHeaders(rv); 309 rv = DoStartReadImmediately(rv);
282 break; 310 break;
283 case READ_STATE_COMPLETE: 311 case READ_STATE_START_NEXT_DEFERRED_READ:
312 rv = DoStartNextDeferredRead(rv);
313 break;
314 case READ_STATE_READ_HEADERS:
315 rv = DoReadHeaders(rv);
316 break;
317 case READ_STATE_READ_HEADERS_COMPLETE:
284 rv = DoReadHeadersComplete(rv); 318 rv = DoReadHeadersComplete(rv);
285 break; 319 break;
286 case READ_STATE_WAITING_FOR_CLOSE: 320 case READ_STATE_WAITING_FOR_CLOSE:
287 rv = DoReadWaitingForClose(rv); 321 // This is a holding state. We return instead of continuing to run hte
322 // loop. The state will advance when the stream calls Close().
323 rv = DoReadWaitForClose(rv);
324 read_still_on_call_stack_ = false;
288 return rv; 325 return rv;
289 case READ_STATE_STREAM_CLOSED: 326 case READ_STATE_STREAM_CLOSED:
290 rv = DoReadStreamClosed(); 327 rv = DoReadStreamClosed();
291 break; 328 break;
292 case READ_STATE_UNUSABLE: 329 case READ_STATE_EVICT_PENDING_READS:
293 rv = DoEvictPendingReadHeaders(rv); 330 rv = DoEvictPendingReadHeaders(rv);
294 break; 331 break;
295 case READ_STATE_NONE: 332 case READ_STATE_NONE:
296 break; 333 break;
297 default: 334 default:
298 NOTREACHED() << "bad read state"; 335 NOTREACHED() << "bad read state";
299 rv = ERR_FAILED; 336 rv = ERR_FAILED;
300 break; 337 break;
301 } 338 }
302 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); 339 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
340 read_still_on_call_stack_ = false;
303 return rv; 341 return rv;
304 } 342 }
305 343
306 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { 344 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
307 CHECK(read_user_callback_);
308 DoReadHeadersLoop(result); 345 DoReadHeadersLoop(result);
309 } 346 }
310 347
311 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { 348 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
mmenke 2011/11/14 16:25:12 You could put some CHECKs analogous to the corresp
James Simonsen 2011/11/14 21:09:49 Done.
312 CHECK(!request_order_.empty()); 349 read_still_on_call_stack_ = true;
313 int pipeline_id = request_order_.front(); 350 read_next_state_ = READ_STATE_READ_HEADERS;
314 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 351 active_read_id_ = request_order_.front();
315 if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) { 352 request_order_.pop();
316 // Since nobody will read whatever data is on the pipeline associated with 353 return OK;
317 // this closed request, we must shut down the rest of the pipeline. 354 }
318 read_next_state_ = READ_STATE_UNUSABLE; 355
356 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
357 if (request_order_.empty()) {
358 read_next_state_ = READ_STATE_NONE;
319 return OK; 359 return OK;
320 } 360 }
321 if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
322 return ERR_IO_PENDING;
323 }
324 CHECK(stream_info_map_[pipeline_id].parser.get());
325 361
326 if (result == ERR_IO_PENDING) { 362 int next_id = request_order_.front();
327 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE); 363 CHECK(ContainsKey(stream_info_map_, next_id));
328 } else { 364 switch (stream_info_map_[next_id].state) {
329 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING); 365 case STREAM_READ_PENDING:
330 stream_info_map_[pipeline_id].state = STREAM_ACTIVE; 366 read_next_state_ = READ_STATE_READ_HEADERS;
367 active_read_id_ = next_id;
368 request_order_.pop();
369 break;
370
371 case STREAM_CLOSED:
372 // Since nobody will read whatever data is on the pipeline associated with
373 // this closed request, we must shut down the rest of the pipeline.
374 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
375 break;
376
377 case STREAM_SENT:
378 read_next_state_ = READ_STATE_NONE;
379 break;
380
381 default:
382 NOTREACHED() << "Unexpected read state: "
383 << stream_info_map_[next_id].state;
331 } 384 }
332 385
333 int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders( 386 return OK;
387 }
388
389 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
390 CHECK(active_read_id_);
391 CHECK(ContainsKey(stream_info_map_, active_read_id_));
392 stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
393 int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
334 &read_io_callback_); 394 &read_io_callback_);
335 if (rv == ERR_IO_PENDING) { 395 read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
336 read_next_state_ = READ_STATE_COMPLETE;
337 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
338 } else if (rv < OK) {
339 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
340 if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
341 rv = ERR_PIPELINE_EVICTION;
342 } else {
343 CHECK_LE(OK, rv);
344 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
345 }
346
347 // |result| == ERR_IO_PENDING means this function was *not* called on the same
348 // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
349 // ReadResponseHeaders() earlier and now need to invoke its callback.
350 if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
351 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
352 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
353 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
354 MessageLoop::current()->PostTask(
355 FROM_HERE,
356 method_factory_.NewRunnableMethod(
357 &HttpPipelinedConnectionImpl::FireUserCallback,
358 pipeline_id,
359 rv));
360 }
361 return rv; 396 return rv;
362 } 397 }
363 398
364 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { 399 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
400 CHECK(active_read_id_);
401 CHECK(ContainsKey(stream_info_map_, active_read_id_));
402
365 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 403 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
366 if (read_user_callback_) { 404 if (result < OK) {
367 int pipeline_id = request_order_.front(); 405 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
368 MessageLoop::current()->PostTask( 406 result = ERR_PIPELINE_EVICTION;
369 FROM_HERE, 407 }
370 method_factory_.NewRunnableMethod( 408 usable_ = false;
371 &HttpPipelinedConnectionImpl::FireUserCallback,
372 pipeline_id,
373 result));
374 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
375 read_user_callback_ = NULL;
376 } 409 }
410
411 if (!read_still_on_call_stack_) {
412 QueueUserCallback(active_read_id_,
413 stream_info_map_[active_read_id_].read_headers_callback,
414 result, FROM_HERE);
415 }
416
377 return result; 417 return result;
378 } 418 }
379 419
380 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { 420 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
381 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 421 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
382 return result; 422 return result;
383 } 423 }
384 424
385 int HttpPipelinedConnectionImpl::DoReadStreamClosed() { 425 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
386 CHECK(!request_order_.empty()); 426 CHECK(active_read_id_);
387 int pipeline_id = request_order_.front(); 427 CHECK(ContainsKey(stream_info_map_, active_read_id_));
388 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 428 CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
389 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 429 active_read_id_ = 0;
390 CHECK(stream_info_map_[pipeline_id].read_headers_callback);
391 stream_info_map_[pipeline_id].read_headers_callback = NULL;
392 request_order_.pop();
393 if (!usable_) { 430 if (!usable_) {
394 read_next_state_ = READ_STATE_UNUSABLE; 431 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
395 return OK;
396 } else {
397 completed_one_request_ = true;
398 if (!request_order_.empty()) {
399 int next_pipeline_id = request_order_.front();
400 CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
401 if (stream_info_map_[next_pipeline_id].read_headers_callback) {
402 stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
403 read_next_state_ = READ_STATE_NEXT_HEADERS;
404 MessageLoop::current()->PostTask(
405 FROM_HERE,
406 method_factory_.NewRunnableMethod(
407 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
408 ERR_IO_PENDING));
409 return ERR_IO_PENDING; // Wait for the task to fire.
410 }
411 }
412 read_next_state_ = READ_STATE_NONE;
413 return OK; 432 return OK;
414 } 433 }
434 completed_one_request_ = true;
435 MessageLoop::current()->PostTask(
436 FROM_HERE,
437 method_factory_.NewRunnableMethod(
438 &HttpPipelinedConnectionImpl::StartNextDeferredRead));
439 read_next_state_ = READ_STATE_NONE;
440 return OK;
415 } 441 }
416 442
417 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { 443 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
418 while (!request_order_.empty()) { 444 while (!request_order_.empty()) {
419 int evicted_id = request_order_.front(); 445 int evicted_id = request_order_.front();
420 request_order_.pop(); 446 request_order_.pop();
421 if (!ContainsKey(stream_info_map_, evicted_id) || 447 if (!ContainsKey(stream_info_map_, evicted_id)) {
422 (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
423 continue; 448 continue;
424 } 449 }
425 if (stream_info_map_[evicted_id].state != STREAM_CLOSED) { 450 if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
426 stream_info_map_[evicted_id].pending_user_callback = 451 stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
427 stream_info_map_[evicted_id].read_headers_callback; 452 QueueUserCallback(evicted_id,
428 MessageLoop::current()->PostTask( 453 stream_info_map_[evicted_id].read_headers_callback,
429 FROM_HERE, 454 ERR_PIPELINE_EVICTION,
430 method_factory_.NewRunnableMethod( 455 FROM_HERE);
431 &HttpPipelinedConnectionImpl::FireUserCallback,
432 evicted_id,
433 ERR_PIPELINE_EVICTION));
434 } 456 }
435 stream_info_map_[evicted_id].read_headers_callback = NULL;
436 } 457 }
437 read_next_state_ = READ_STATE_NONE; 458 read_next_state_ = READ_STATE_NONE;
438 return result; 459 return result;
439 } 460 }
440 461
441 void HttpPipelinedConnectionImpl::Close(int pipeline_id, 462 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
442 bool not_reusable) { 463 bool not_reusable) {
443 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 464 CHECK(ContainsKey(stream_info_map_, pipeline_id));
444 switch (stream_info_map_[pipeline_id].state) { 465 switch (stream_info_map_[pipeline_id].state) {
445 case STREAM_CREATED: 466 case STREAM_CREATED:
446 stream_info_map_[pipeline_id].state = STREAM_UNUSED; 467 stream_info_map_[pipeline_id].state = STREAM_UNUSED;
447 break; 468 break;
448 469
449 case STREAM_BOUND: 470 case STREAM_BOUND:
450 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 471 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
451 break; 472 break;
452 473
453 case STREAM_SENDING: 474 case STREAM_SENDING:
454 usable_ = false; 475 usable_ = false;
455 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 476 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
456 send_user_callback_ = NULL; 477 active_send_request_.reset();
457 send_next_state_ = SEND_STATE_UNUSABLE; 478 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
458 DoSendRequestLoop(OK); 479 DoSendRequestLoop(OK);
459 break; 480 break;
460 481
461 case STREAM_SENT: 482 case STREAM_SENT:
462 case STREAM_READ_PENDING: 483 case STREAM_READ_PENDING:
463 usable_ = false; 484 usable_ = false;
464 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 485 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
465 stream_info_map_[pipeline_id].read_headers_callback = NULL; 486 if (!request_order_.empty() &&
466 if (read_next_state_ == READ_STATE_NONE) { 487 pipeline_id == request_order_.front() &&
467 read_next_state_ = READ_STATE_UNUSABLE; 488 read_next_state_ == READ_STATE_NONE) {
489 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
468 DoReadHeadersLoop(OK); 490 DoReadHeadersLoop(OK);
469 } 491 }
470 break; 492 break;
471 493
472 case STREAM_ACTIVE: 494 case STREAM_ACTIVE:
473 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 495 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
474 if (not_reusable) { 496 if (not_reusable) {
475 usable_ = false; 497 usable_ = false;
476 } 498 }
477 read_next_state_ = READ_STATE_STREAM_CLOSED; 499 read_next_state_ = READ_STATE_STREAM_CLOSED;
478 read_user_callback_ = NULL;
479 DoReadHeadersLoop(OK); 500 DoReadHeadersLoop(OK);
480 break; 501 break;
481 502
503 case STREAM_READ_EVICTED:
504 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
505 break;
506
482 case STREAM_CLOSED: 507 case STREAM_CLOSED:
483 case STREAM_UNUSED: 508 case STREAM_UNUSED:
484 // TODO(simonjam): Why is Close() sometimes called twice? 509 // TODO(simonjam): Why is Close() sometimes called twice?
485 break; 510 break;
486 511
487 default: 512 default:
488 NOTREACHED(); 513 NOTREACHED();
489 break; 514 break;
490 } 515 }
491 } 516 }
492 517
493 int HttpPipelinedConnectionImpl::ReadResponseBody( 518 int HttpPipelinedConnectionImpl::ReadResponseBody(
494 int pipeline_id, 519 int pipeline_id,
495 IOBuffer* buf, 520 IOBuffer* buf,
496 int buf_len, 521 int buf_len,
497 OldCompletionCallback* callback) { 522 OldCompletionCallback* callback) {
498 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 523 CHECK(ContainsKey(stream_info_map_, pipeline_id));
499 CHECK(!request_order_.empty()); 524 CHECK_EQ(active_read_id_, pipeline_id);
500 CHECK_EQ(pipeline_id, request_order_.front());
501 CHECK(stream_info_map_[pipeline_id].parser.get()); 525 CHECK(stream_info_map_[pipeline_id].parser.get());
502 return stream_info_map_[pipeline_id].parser->ReadResponseBody( 526 return stream_info_map_[pipeline_id].parser->ReadResponseBody(
503 buf, buf_len, callback); 527 buf, buf_len, callback);
504 } 528 }
505 529
506 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const { 530 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
507 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 531 CHECK(ContainsKey(stream_info_map_, pipeline_id));
508 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 532 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
509 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress(); 533 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
510 } 534 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 584
561 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( 585 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
562 int pipeline_id, 586 int pipeline_id,
563 SSLCertRequestInfo* cert_request_info) { 587 SSLCertRequestInfo* cert_request_info) {
564 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 588 CHECK(ContainsKey(stream_info_map_, pipeline_id));
565 CHECK(stream_info_map_[pipeline_id].parser.get()); 589 CHECK(stream_info_map_[pipeline_id].parser.get());
566 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo( 590 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
567 cert_request_info); 591 cert_request_info);
568 } 592 }
569 593
594 void HttpPipelinedConnectionImpl::QueueUserCallback(
595 int pipeline_id,
596 OldCompletionCallback* callback,
597 int rv,
598 const tracked_objects::Location& from_here) {
599 CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
600 stream_info_map_[pipeline_id].pending_user_callback = callback;
601 MessageLoop::current()->PostTask(
602 from_here,
603 method_factory_.NewRunnableMethod(
604 &HttpPipelinedConnectionImpl::FireUserCallback,
605 pipeline_id,
606 rv));
607 }
608
570 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id, 609 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
571 int result) { 610 int result) {
572 if (ContainsKey(stream_info_map_, pipeline_id)) { 611 if (ContainsKey(stream_info_map_, pipeline_id)) {
573 stream_info_map_[pipeline_id].pending_user_callback->Run(result); 612 CHECK(stream_info_map_[pipeline_id].pending_user_callback);
613 OldCompletionCallback* callback =
614 stream_info_map_[pipeline_id].pending_user_callback;
615 stream_info_map_[pipeline_id].pending_user_callback = NULL;
616 callback->Run(result);
574 } 617 }
575 } 618 }
576 619
577 int HttpPipelinedConnectionImpl::depth() const { 620 int HttpPipelinedConnectionImpl::depth() const {
578 return stream_info_map_.size(); 621 return stream_info_map_.size();
579 } 622 }
580 623
581 bool HttpPipelinedConnectionImpl::usable() const { 624 bool HttpPipelinedConnectionImpl::usable() const {
582 return usable_; 625 return usable_;
583 } 626 }
(...skipping 11 matching lines...) Expand all
595 } 638 }
596 639
597 const NetLog::Source& HttpPipelinedConnectionImpl::source() const { 640 const NetLog::Source& HttpPipelinedConnectionImpl::source() const {
598 return net_log_.source(); 641 return net_log_.source();
599 } 642 }
600 643
601 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { 644 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
602 return was_npn_negotiated_; 645 return was_npn_negotiated_;
603 } 646 }
604 647
605 HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() { 648 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
606 } 649 }
607 650
608 HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() { 651 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
609 } 652 }
610 653
611 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() 654 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
612 : read_headers_callback(NULL), 655 : read_headers_callback(NULL),
656 pending_user_callback(NULL),
613 state(STREAM_CREATED) { 657 state(STREAM_CREATED) {
614 } 658 }
615 659
616 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() { 660 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
617 } 661 }
618 662
619 } // namespace net 663 } // namespace net
OLDNEW
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698