Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 8515020: Refactor state machines in HttpPipelinedConnectionImpl. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_pipelined_connection_impl.h" 5 #include "net/http/http_pipelined_connection_impl.h"
6 6
7 #include "base/message_loop.h" 7 #include "base/message_loop.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h" 9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_stream.h" 10 #include "net/http/http_pipelined_stream.h"
(...skipping 16 matching lines...) Expand all
27 used_proxy_info_(used_proxy_info), 27 used_proxy_info_(used_proxy_info),
28 net_log_(net_log), 28 net_log_(net_log),
29 was_npn_negotiated_(was_npn_negotiated), 29 was_npn_negotiated_(was_npn_negotiated),
30 read_buf_(new GrowableIOBuffer()), 30 read_buf_(new GrowableIOBuffer()),
31 next_pipeline_id_(1), 31 next_pipeline_id_(1),
32 active_(false), 32 active_(false),
33 usable_(true), 33 usable_(true),
34 completed_one_request_(false), 34 completed_one_request_(false),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), 35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE), 36 send_next_state_(SEND_STATE_NONE),
37 send_still_on_call_stack_(false),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( 38 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), 39 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE), 40 read_next_state_(READ_STATE_NONE),
41 active_read_id_(0),
42 read_still_on_call_stack_(false),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( 43 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), 44 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
43 read_user_callback_(NULL) {
44 CHECK(connection_.get()); 45 CHECK(connection_.get());
45 } 46 }
46 47
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { 48 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 CHECK_EQ(depth(), 0); 49 CHECK_EQ(depth(), 0);
49 CHECK(stream_info_map_.empty()); 50 CHECK(stream_info_map_.empty());
50 CHECK(deferred_request_queue_.empty()); 51 CHECK(pending_send_request_queue_.empty());
51 CHECK(request_order_.empty()); 52 CHECK(request_order_.empty());
52 CHECK_EQ(send_next_state_, SEND_STATE_NONE); 53 CHECK_EQ(send_next_state_, SEND_STATE_NONE);
53 CHECK_EQ(read_next_state_, READ_STATE_NONE); 54 CHECK_EQ(read_next_state_, READ_STATE_NONE);
54 CHECK(!send_user_callback_); 55 CHECK(!active_send_request_.get());
55 CHECK(!read_user_callback_); 56 CHECK(!active_read_id_);
56 if (!usable_) { 57 if (!usable_) {
57 connection_->socket()->Disconnect(); 58 connection_->socket()->Disconnect();
58 } 59 }
59 connection_->Reset(); 60 connection_->Reset();
60 } 61 }
61 62
62 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() { 63 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
63 int pipeline_id = next_pipeline_id_++; 64 int pipeline_id = next_pipeline_id_++;
64 CHECK(pipeline_id); 65 CHECK(pipeline_id);
65 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); 66 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 99 CHECK(ContainsKey(stream_info_map_, pipeline_id));
99 Close(pipeline_id, false); 100 Close(pipeline_id, false);
100 101
101 if (stream_info_map_[pipeline_id].state != STREAM_CREATED && 102 if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
102 stream_info_map_[pipeline_id].state != STREAM_UNUSED) { 103 stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
103 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 104 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
104 CHECK(stream_info_map_[pipeline_id].parser.get()); 105 CHECK(stream_info_map_[pipeline_id].parser.get());
105 stream_info_map_[pipeline_id].parser.reset(); 106 stream_info_map_[pipeline_id].parser.reset();
106 } 107 }
107 CHECK(!stream_info_map_[pipeline_id].parser.get()); 108 CHECK(!stream_info_map_[pipeline_id].parser.get());
108 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
109 stream_info_map_.erase(pipeline_id); 109 stream_info_map_.erase(pipeline_id);
110 110
111 delegate_->OnPipelineHasCapacity(this); 111 delegate_->OnPipelineHasCapacity(this);
112 } 112 }
113 113
114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, 114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
115 const std::string& request_line, 115 const std::string& request_line,
116 const HttpRequestHeaders& headers, 116 const HttpRequestHeaders& headers,
117 UploadDataStream* request_body, 117 UploadDataStream* request_body,
118 HttpResponseInfo* response, 118 HttpResponseInfo* response,
119 OldCompletionCallback* callback) { 119 OldCompletionCallback* callback) {
120 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 120 CHECK(ContainsKey(stream_info_map_, pipeline_id));
121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND); 121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
122 if (!usable_) { 122 if (!usable_) {
123 return ERR_PIPELINE_EVICTION; 123 return ERR_PIPELINE_EVICTION;
124 } 124 }
125 125
126 DeferredSendRequest deferred_request; 126 PendingSendRequest* send_request = new PendingSendRequest;
127 deferred_request.pipeline_id = pipeline_id; 127 send_request->pipeline_id = pipeline_id;
128 deferred_request.request_line = request_line; 128 send_request->request_line = request_line;
129 deferred_request.headers = headers; 129 send_request->headers = headers;
130 deferred_request.request_body = request_body; 130 send_request->request_body = request_body;
131 deferred_request.response = response; 131 send_request->response = response;
132 deferred_request.callback = callback; 132 send_request->callback = callback;
133 deferred_request_queue_.push(deferred_request); 133 pending_send_request_queue_.push(send_request);
134 134
135 int rv; 135 int rv;
136 if (send_next_state_ == SEND_STATE_NONE) { 136 if (send_next_state_ == SEND_STATE_NONE) {
137 send_next_state_ = SEND_STATE_NEXT_REQUEST; 137 send_next_state_ = SEND_STATE_START_IMMEDIATELY;
138 rv = DoSendRequestLoop(OK); 138 rv = DoSendRequestLoop(OK);
mmenke 2011/11/11 16:46:35 It might be a little simpler just to only set send
James Simonsen 2011/11/11 19:26:08 It would definitely merge the two cases, which wou
mmenke 2011/11/14 16:25:12 I think that setting the callback after the call t
139 } else { 139 } else {
140 rv = ERR_IO_PENDING; 140 rv = ERR_IO_PENDING;
141 } 141 }
142 ActivatePipeline(); 142 ActivatePipeline();
143 return rv; 143 return rv;
144 } 144 }
145 145
146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { 146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
147 int rv = result; 147 int rv = result;
148 do { 148 do {
149 SendRequestState state = send_next_state_; 149 SendRequestState state = send_next_state_;
150 send_next_state_ = SEND_STATE_NONE; 150 send_next_state_ = SEND_STATE_NONE;
151 switch (state) { 151 switch (state) {
152 case SEND_STATE_NEXT_REQUEST: 152 case SEND_STATE_START_IMMEDIATELY:
153 rv = DoSendNextRequest(rv); 153 rv = DoStartRequestImmediately(rv);
154 break;
155 case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
156 rv = DoStartNextDeferredRequest(rv);
157 break;
158 case SEND_STATE_SEND_ACTIVE_REQUEST:
159 rv = DoSendActiveRequest(rv);
154 break; 160 break;
155 case SEND_STATE_COMPLETE: 161 case SEND_STATE_COMPLETE:
156 rv = DoSendComplete(rv); 162 rv = DoSendComplete(rv);
157 break; 163 break;
158 case SEND_STATE_UNUSABLE: 164 case SEND_STATE_EVICT_PENDING_REQUESTS:
159 rv = DoEvictPendingSendRequests(rv); 165 rv = DoEvictPendingSendRequests(rv);
160 break; 166 break;
161 default: 167 default:
162 NOTREACHED() << "bad send state: " << state; 168 NOTREACHED() << "bad send state: " << state;
163 rv = ERR_FAILED; 169 rv = ERR_FAILED;
164 break; 170 break;
165 } 171 }
166 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); 172 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
173 send_still_on_call_stack_ = false;
167 return rv; 174 return rv;
168 } 175 }
169 176
170 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { 177 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
171 CHECK(send_user_callback_); 178 CHECK(active_send_request_.get());
172 DoSendRequestLoop(result); 179 DoSendRequestLoop(result);
173 } 180 }
174 181
175 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { 182 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
176 CHECK(!deferred_request_queue_.empty()); 183 CHECK(!active_send_request_.get());
177 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 184 CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
178 CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id)); 185 send_still_on_call_stack_ = true;
179 if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) { 186 active_send_request_.reset(pending_send_request_queue_.front());
180 deferred_request_queue_.pop(); 187 pending_send_request_queue_.pop();
181 if (deferred_request_queue_.empty()) { 188 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
182 send_next_state_ = SEND_STATE_NONE; 189 return OK;
183 } else { 190 }
184 send_next_state_ = SEND_STATE_NEXT_REQUEST; 191
192 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
193 CHECK(!send_still_on_call_stack_);
194 CHECK(!active_send_request_.get());
195
196 while (!pending_send_request_queue_.empty()) {
197 scoped_ptr<PendingSendRequest> next_request(
198 pending_send_request_queue_.front());
199 pending_send_request_queue_.pop();
200 CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
201 if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
202 active_send_request_.reset(next_request.release());
203 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
204 return OK;
185 } 205 }
186 return OK;
187 } 206 }
188 CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get()); 207
189 int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest( 208 send_next_state_ = SEND_STATE_NONE;
190 deferred_request.request_line, 209 return OK;
191 deferred_request.headers, 210 }
192 deferred_request.request_body, 211
193 deferred_request.response, 212 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
194 &send_io_callback_); 213 CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
195 // |result| == ERR_IO_PENDING means this function was *not* called on the same 214 int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
196 // stack as SendRequest(). That means we returned ERR_IO_PENDING to 215 SendRequest(active_send_request_->request_line,
197 // SendRequest() earlier and will need to invoke its callback. 216 active_send_request_->headers,
198 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { 217 active_send_request_->request_body,
199 send_user_callback_ = deferred_request.callback; 218 active_send_request_->response,
200 } 219 &send_io_callback_);
201 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING; 220 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
202 send_next_state_ = SEND_STATE_COMPLETE; 221 send_next_state_ = SEND_STATE_COMPLETE;
203 return rv; 222 return rv;
204 } 223 }
205 224
206 int HttpPipelinedConnectionImpl::DoSendComplete(int result) { 225 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
207 CHECK(!deferred_request_queue_.empty()); 226 CHECK(active_send_request_.get());
208 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 227 CHECK_EQ(STREAM_SENDING,
209 CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state, 228 stream_info_map_[active_send_request_->pipeline_id].state);
210 STREAM_SENDING); 229
211 request_order_.push(deferred_request.pipeline_id); 230 request_order_.push(active_send_request_->pipeline_id);
212 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT; 231 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
213 deferred_request_queue_.pop(); 232
214 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { 233 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
215 result = ERR_PIPELINE_EVICTION; 234 result = ERR_PIPELINE_EVICTION;
216 } 235 }
217 if (result < OK) { 236 if (result < OK) {
218 send_next_state_ = SEND_STATE_UNUSABLE;
219 usable_ = false; 237 usable_ = false;
220 } 238 }
221 if (send_user_callback_) { 239
222 MessageLoop::current()->PostTask( 240 if (!send_still_on_call_stack_) {
223 FROM_HERE, 241 QueueUserCallback(active_send_request_->pipeline_id,
224 method_factory_.NewRunnableMethod( 242 active_send_request_->callback, result, FROM_HERE);
225 &HttpPipelinedConnectionImpl::FireUserCallback,
226 deferred_request.pipeline_id,
227 result));
228 stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
229 send_user_callback_;
230 send_user_callback_ = NULL;
231 } 243 }
232 if (result < OK) { 244
233 return result; 245 active_send_request_.reset();
246
247 if (send_still_on_call_stack_) {
mmenke 2011/11/11 16:46:35 nit: Maybe add a comment here that in this case,
James Simonsen 2011/11/11 19:26:08 Done.
248 send_next_state_ = SEND_STATE_NONE;
249 } else if (!usable_) {
250 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
251 } else {
252 send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
234 } 253 }
235 if (deferred_request_queue_.empty()) { 254
236 send_next_state_ = SEND_STATE_NONE; 255 return result;
237 return OK;
238 }
239 send_next_state_ = SEND_STATE_NEXT_REQUEST;
240 return OK;
241 } 256 }
242 257
243 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { 258 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
259 while (!pending_send_request_queue_.empty()) {
260 scoped_ptr<PendingSendRequest> evicted_send(
261 pending_send_request_queue_.front());
262 pending_send_request_queue_.pop();
263 if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
264 evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
265 }
266 }
244 send_next_state_ = SEND_STATE_NONE; 267 send_next_state_ = SEND_STATE_NONE;
245 while (!deferred_request_queue_.empty()) {
246 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
247 if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
248 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
249 }
250 deferred_request_queue_.pop();
251 }
252 return result; 268 return result;
253 } 269 }
254 270
255 int HttpPipelinedConnectionImpl::ReadResponseHeaders( 271 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
256 int pipeline_id, 272 int pipeline_id,
257 OldCompletionCallback* callback) { 273 OldCompletionCallback* callback) {
258 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 274 CHECK(ContainsKey(stream_info_map_, pipeline_id));
259 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT); 275 CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
260 CHECK(!stream_info_map_[pipeline_id].read_headers_callback); 276 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
277
261 if (!usable_) { 278 if (!usable_) {
262 return ERR_PIPELINE_EVICTION; 279 return ERR_PIPELINE_EVICTION;
263 } 280 }
281
264 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; 282 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
265 stream_info_map_[pipeline_id].read_headers_callback = callback; 283 stream_info_map_[pipeline_id].read_headers_callback = callback;
284 if (read_next_state_ == READ_STATE_NONE &&
285 pipeline_id == request_order_.front()) {
286 read_next_state_ = READ_STATE_START_IMMEDIATELY;
287 return DoReadHeadersLoop(OK);
288 }
289 return ERR_IO_PENDING;
290 }
291
292 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
266 if (read_next_state_ == READ_STATE_NONE) { 293 if (read_next_state_ == READ_STATE_NONE) {
267 read_next_state_ = READ_STATE_NEXT_HEADERS; 294 read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
268 return DoReadHeadersLoop(OK); 295 DoReadHeadersLoop(OK);
269 } else {
270 return ERR_IO_PENDING;
271 } 296 }
272 } 297 }
273 298
274 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { 299 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
275 int rv = result; 300 int rv = result;
276 do { 301 do {
277 ReadHeadersState state = read_next_state_; 302 ReadHeadersState state = read_next_state_;
278 read_next_state_ = READ_STATE_NONE; 303 read_next_state_ = READ_STATE_NONE;
279 switch (state) { 304 switch (state) {
280 case READ_STATE_NEXT_HEADERS: 305 case READ_STATE_START_IMMEDIATELY:
281 rv = DoReadNextHeaders(rv); 306 rv = DoStartReadImmediately(rv);
282 break; 307 break;
283 case READ_STATE_COMPLETE: 308 case READ_STATE_START_NEXT_DEFERRED_READ:
309 rv = DoStartNextDeferredRead(rv);
310 break;
311 case READ_STATE_READ_HEADERS:
312 rv = DoReadHeaders(rv);
313 break;
314 case READ_STATE_READ_HEADERS_COMPLETE:
284 rv = DoReadHeadersComplete(rv); 315 rv = DoReadHeadersComplete(rv);
285 break; 316 break;
286 case READ_STATE_WAITING_FOR_CLOSE: 317 case READ_STATE_WAITING_FOR_CLOSE:
287 rv = DoReadWaitingForClose(rv); 318 rv = DoReadWaitForClose(rv);
319 read_still_on_call_stack_ = false;
288 return rv; 320 return rv;
mmenke 2011/11/11 16:46:35 nit: Think it might be nice to have a comment dra
James Simonsen 2011/11/11 19:26:08 Done.
289 case READ_STATE_STREAM_CLOSED: 321 case READ_STATE_STREAM_CLOSED:
290 rv = DoReadStreamClosed(); 322 rv = DoReadStreamClosed();
291 break; 323 break;
292 case READ_STATE_UNUSABLE: 324 case READ_STATE_EVICT_PENDING_READS:
293 rv = DoEvictPendingReadHeaders(rv); 325 rv = DoEvictPendingReadHeaders(rv);
294 break; 326 break;
295 case READ_STATE_NONE: 327 case READ_STATE_NONE:
296 break; 328 break;
297 default: 329 default:
298 NOTREACHED() << "bad read state"; 330 NOTREACHED() << "bad read state";
299 rv = ERR_FAILED; 331 rv = ERR_FAILED;
300 break; 332 break;
301 } 333 }
302 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); 334 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
335 read_still_on_call_stack_ = false;
303 return rv; 336 return rv;
304 } 337 }
305 338
306 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { 339 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
307 CHECK(read_user_callback_);
308 DoReadHeadersLoop(result); 340 DoReadHeadersLoop(result);
309 } 341 }
310 342
311 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { 343 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
312 CHECK(!request_order_.empty()); 344 read_still_on_call_stack_ = true;
313 int pipeline_id = request_order_.front(); 345 read_next_state_ = READ_STATE_READ_HEADERS;
314 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 346 active_read_id_ = request_order_.front();
315 if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) { 347 request_order_.pop();
316 // Since nobody will read whatever data is on the pipeline associated with 348 return OK;
317 // this closed request, we must shut down the rest of the pipeline. 349 }
318 read_next_state_ = READ_STATE_UNUSABLE; 350
351 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
352 if (request_order_.empty()) {
353 read_next_state_ = READ_STATE_NONE;
319 return OK; 354 return OK;
320 } 355 }
321 if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
322 return ERR_IO_PENDING;
323 }
324 CHECK(stream_info_map_[pipeline_id].parser.get());
325 356
326 if (result == ERR_IO_PENDING) { 357 int next_id = request_order_.front();
327 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE); 358 CHECK(ContainsKey(stream_info_map_, next_id));
328 } else { 359 switch (stream_info_map_[next_id].state) {
329 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING); 360 case STREAM_READ_PENDING:
330 stream_info_map_[pipeline_id].state = STREAM_ACTIVE; 361 read_next_state_ = READ_STATE_READ_HEADERS;
362 active_read_id_ = next_id;
363 request_order_.pop();
364 break;
365
366 case STREAM_CLOSED:
367 // Since nobody will read whatever data is on the pipeline associated with
368 // this closed request, we must shut down the rest of the pipeline.
369 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
370 break;
371
372 case STREAM_SENT:
373 read_next_state_ = READ_STATE_NONE;
374 break;
375
376 default:
377 NOTREACHED() << "Unexpected read state: "
378 << stream_info_map_[next_id].state;
331 } 379 }
332 380
333 int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders( 381 return OK;
382 }
383
384 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
385 CHECK(active_read_id_);
386 CHECK(ContainsKey(stream_info_map_, active_read_id_));
387 stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
388 int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
334 &read_io_callback_); 389 &read_io_callback_);
335 if (rv == ERR_IO_PENDING) { 390 read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
336 read_next_state_ = READ_STATE_COMPLETE;
337 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
338 } else if (rv < OK) {
339 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
340 if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
341 rv = ERR_PIPELINE_EVICTION;
342 } else {
343 CHECK_LE(OK, rv);
344 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
345 }
346
347 // |result| == ERR_IO_PENDING means this function was *not* called on the same
348 // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
349 // ReadResponseHeaders() earlier and now need to invoke its callback.
350 if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
351 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
352 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
353 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
354 MessageLoop::current()->PostTask(
355 FROM_HERE,
356 method_factory_.NewRunnableMethod(
357 &HttpPipelinedConnectionImpl::FireUserCallback,
358 pipeline_id,
359 rv));
360 }
361 return rv; 391 return rv;
362 } 392 }
363 393
364 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { 394 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
395 CHECK(active_read_id_);
396 CHECK(ContainsKey(stream_info_map_, active_read_id_));
397
365 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 398 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
366 if (read_user_callback_) { 399 if (result < OK) {
367 int pipeline_id = request_order_.front(); 400 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
368 MessageLoop::current()->PostTask( 401 result = ERR_PIPELINE_EVICTION;
369 FROM_HERE, 402 }
370 method_factory_.NewRunnableMethod( 403 usable_ = false;
371 &HttpPipelinedConnectionImpl::FireUserCallback,
372 pipeline_id,
373 result));
374 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
375 read_user_callback_ = NULL;
376 } 404 }
405
406 if (!read_still_on_call_stack_) {
407 QueueUserCallback(active_read_id_,
408 stream_info_map_[active_read_id_].read_headers_callback,
409 result, FROM_HERE);
410 }
411
377 return result; 412 return result;
378 } 413 }
379 414
380 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { 415 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
381 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 416 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
382 return result; 417 return result;
383 } 418 }
384 419
385 int HttpPipelinedConnectionImpl::DoReadStreamClosed() { 420 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
386 CHECK(!request_order_.empty()); 421 CHECK(active_read_id_);
387 int pipeline_id = request_order_.front(); 422 CHECK(ContainsKey(stream_info_map_, active_read_id_));
388 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 423 CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
389 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 424 active_read_id_ = 0;
390 CHECK(stream_info_map_[pipeline_id].read_headers_callback);
391 stream_info_map_[pipeline_id].read_headers_callback = NULL;
392 request_order_.pop();
393 if (!usable_) { 425 if (!usable_) {
394 read_next_state_ = READ_STATE_UNUSABLE; 426 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
395 return OK;
396 } else {
397 completed_one_request_ = true;
398 if (!request_order_.empty()) {
399 int next_pipeline_id = request_order_.front();
400 CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
401 if (stream_info_map_[next_pipeline_id].read_headers_callback) {
402 stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
403 read_next_state_ = READ_STATE_NEXT_HEADERS;
404 MessageLoop::current()->PostTask(
405 FROM_HERE,
406 method_factory_.NewRunnableMethod(
407 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
408 ERR_IO_PENDING));
409 return ERR_IO_PENDING; // Wait for the task to fire.
410 }
411 }
412 read_next_state_ = READ_STATE_NONE;
413 return OK; 427 return OK;
414 } 428 }
429 completed_one_request_ = true;
430 MessageLoop::current()->PostTask(
431 FROM_HERE,
432 method_factory_.NewRunnableMethod(
433 &HttpPipelinedConnectionImpl::StartNextDeferredRead));
434 read_next_state_ = READ_STATE_NONE;
435 return OK;
415 } 436 }
416 437
417 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { 438 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
418 while (!request_order_.empty()) { 439 while (!request_order_.empty()) {
419 int evicted_id = request_order_.front(); 440 int evicted_id = request_order_.front();
420 request_order_.pop(); 441 request_order_.pop();
421 if (!ContainsKey(stream_info_map_, evicted_id) || 442 if (!ContainsKey(stream_info_map_, evicted_id)) {
422 (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
423 continue; 443 continue;
424 } 444 }
425 if (stream_info_map_[evicted_id].state != STREAM_CLOSED) { 445 if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
426 stream_info_map_[evicted_id].pending_user_callback = 446 stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
427 stream_info_map_[evicted_id].read_headers_callback; 447 QueueUserCallback(evicted_id,
428 MessageLoop::current()->PostTask( 448 stream_info_map_[evicted_id].read_headers_callback,
429 FROM_HERE, 449 ERR_PIPELINE_EVICTION,
430 method_factory_.NewRunnableMethod( 450 FROM_HERE);
431 &HttpPipelinedConnectionImpl::FireUserCallback,
432 evicted_id,
433 ERR_PIPELINE_EVICTION));
434 } 451 }
435 stream_info_map_[evicted_id].read_headers_callback = NULL;
436 } 452 }
437 read_next_state_ = READ_STATE_NONE; 453 read_next_state_ = READ_STATE_NONE;
438 return result; 454 return result;
439 } 455 }
440 456
441 void HttpPipelinedConnectionImpl::Close(int pipeline_id, 457 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
442 bool not_reusable) { 458 bool not_reusable) {
443 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 459 CHECK(ContainsKey(stream_info_map_, pipeline_id));
444 switch (stream_info_map_[pipeline_id].state) { 460 switch (stream_info_map_[pipeline_id].state) {
445 case STREAM_CREATED: 461 case STREAM_CREATED:
446 stream_info_map_[pipeline_id].state = STREAM_UNUSED; 462 stream_info_map_[pipeline_id].state = STREAM_UNUSED;
447 break; 463 break;
448 464
449 case STREAM_BOUND: 465 case STREAM_BOUND:
450 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 466 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
451 break; 467 break;
452 468
453 case STREAM_SENDING: 469 case STREAM_SENDING:
454 usable_ = false; 470 usable_ = false;
455 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 471 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
456 send_user_callback_ = NULL; 472 active_send_request_.reset();
457 send_next_state_ = SEND_STATE_UNUSABLE; 473 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
458 DoSendRequestLoop(OK); 474 DoSendRequestLoop(OK);
459 break; 475 break;
460 476
461 case STREAM_SENT: 477 case STREAM_SENT:
462 case STREAM_READ_PENDING: 478 case STREAM_READ_PENDING:
463 usable_ = false; 479 usable_ = false;
464 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 480 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
465 stream_info_map_[pipeline_id].read_headers_callback = NULL; 481 if (!request_order_.empty() &&
466 if (read_next_state_ == READ_STATE_NONE) { 482 pipeline_id == request_order_.front() &&
467 read_next_state_ = READ_STATE_UNUSABLE; 483 read_next_state_ == READ_STATE_NONE) {
484 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
468 DoReadHeadersLoop(OK); 485 DoReadHeadersLoop(OK);
469 } 486 }
470 break; 487 break;
471 488
472 case STREAM_ACTIVE: 489 case STREAM_ACTIVE:
473 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 490 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
474 if (not_reusable) { 491 if (not_reusable) {
475 usable_ = false; 492 usable_ = false;
476 } 493 }
477 read_next_state_ = READ_STATE_STREAM_CLOSED; 494 read_next_state_ = READ_STATE_STREAM_CLOSED;
478 read_user_callback_ = NULL;
479 DoReadHeadersLoop(OK); 495 DoReadHeadersLoop(OK);
480 break; 496 break;
481 497
498 case STREAM_READ_EVICTED:
499 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
500 break;
501
482 case STREAM_CLOSED: 502 case STREAM_CLOSED:
483 case STREAM_UNUSED: 503 case STREAM_UNUSED:
484 // TODO(simonjam): Why is Close() sometimes called twice? 504 // TODO(simonjam): Why is Close() sometimes called twice?
485 break; 505 break;
486 506
487 default: 507 default:
488 NOTREACHED(); 508 NOTREACHED();
489 break; 509 break;
490 } 510 }
491 } 511 }
492 512
493 int HttpPipelinedConnectionImpl::ReadResponseBody( 513 int HttpPipelinedConnectionImpl::ReadResponseBody(
494 int pipeline_id, 514 int pipeline_id,
495 IOBuffer* buf, 515 IOBuffer* buf,
496 int buf_len, 516 int buf_len,
497 OldCompletionCallback* callback) { 517 OldCompletionCallback* callback) {
498 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 518 CHECK(ContainsKey(stream_info_map_, pipeline_id));
499 CHECK(!request_order_.empty()); 519 CHECK_EQ(active_read_id_, pipeline_id);
500 CHECK_EQ(pipeline_id, request_order_.front());
501 CHECK(stream_info_map_[pipeline_id].parser.get()); 520 CHECK(stream_info_map_[pipeline_id].parser.get());
502 return stream_info_map_[pipeline_id].parser->ReadResponseBody( 521 return stream_info_map_[pipeline_id].parser->ReadResponseBody(
503 buf, buf_len, callback); 522 buf, buf_len, callback);
504 } 523 }
505 524
506 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const { 525 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
507 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 526 CHECK(ContainsKey(stream_info_map_, pipeline_id));
508 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 527 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
509 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress(); 528 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
510 } 529 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 579
561 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( 580 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
562 int pipeline_id, 581 int pipeline_id,
563 SSLCertRequestInfo* cert_request_info) { 582 SSLCertRequestInfo* cert_request_info) {
564 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 583 CHECK(ContainsKey(stream_info_map_, pipeline_id));
565 CHECK(stream_info_map_[pipeline_id].parser.get()); 584 CHECK(stream_info_map_[pipeline_id].parser.get());
566 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo( 585 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
567 cert_request_info); 586 cert_request_info);
568 } 587 }
569 588
589 void HttpPipelinedConnectionImpl::QueueUserCallback(
590 int pipeline_id,
591 OldCompletionCallback* callback,
592 int rv,
593 const tracked_objects::Location& from_here) {
594 CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
595 stream_info_map_[pipeline_id].pending_user_callback = callback;
596 MessageLoop::current()->PostTask(
597 from_here,
598 method_factory_.NewRunnableMethod(
599 &HttpPipelinedConnectionImpl::FireUserCallback,
600 pipeline_id,
601 rv));
602 }
603
570 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id, 604 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
571 int result) { 605 int result) {
572 if (ContainsKey(stream_info_map_, pipeline_id)) { 606 if (ContainsKey(stream_info_map_, pipeline_id)) {
573 stream_info_map_[pipeline_id].pending_user_callback->Run(result); 607 CHECK(stream_info_map_[pipeline_id].pending_user_callback);
608 OldCompletionCallback* callback =
609 stream_info_map_[pipeline_id].pending_user_callback;
610 stream_info_map_[pipeline_id].pending_user_callback = NULL;
611 callback->Run(result);
574 } 612 }
575 } 613 }
576 614
577 int HttpPipelinedConnectionImpl::depth() const { 615 int HttpPipelinedConnectionImpl::depth() const {
578 return stream_info_map_.size(); 616 return stream_info_map_.size();
579 } 617 }
580 618
581 bool HttpPipelinedConnectionImpl::usable() const { 619 bool HttpPipelinedConnectionImpl::usable() const {
582 return usable_; 620 return usable_;
583 } 621 }
(...skipping 11 matching lines...) Expand all
595 } 633 }
596 634
597 const NetLog::Source& HttpPipelinedConnectionImpl::source() const { 635 const NetLog::Source& HttpPipelinedConnectionImpl::source() const {
598 return net_log_.source(); 636 return net_log_.source();
599 } 637 }
600 638
601 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { 639 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
602 return was_npn_negotiated_; 640 return was_npn_negotiated_;
603 } 641 }
604 642
605 HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() { 643 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
606 } 644 }
607 645
608 HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() { 646 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
609 } 647 }
610 648
611 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() 649 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
612 : read_headers_callback(NULL), 650 : read_headers_callback(NULL),
651 pending_user_callback(NULL),
613 state(STREAM_CREATED) { 652 state(STREAM_CREATED) {
614 } 653 }
615 654
616 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() { 655 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
617 } 656 }
618 657
619 } // namespace net 658 } // namespace net
OLDNEW
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698