Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1409)

Side by Side Diff: net/http/http_pipelined_connection_impl.cc

Issue 8515020: Refactor state machines in HttpPipelinedConnectionImpl. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add more checks and comments Created 9 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_pipelined_connection_impl.h" 5 #include "net/http/http_pipelined_connection_impl.h"
6 6
7 #include "base/message_loop.h" 7 #include "base/message_loop.h"
8 #include "base/stl_util.h" 8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h" 9 #include "net/base/io_buffer.h"
10 #include "net/http/http_pipelined_stream.h" 10 #include "net/http/http_pipelined_stream.h"
(...skipping 16 matching lines...) Expand all
27 used_proxy_info_(used_proxy_info), 27 used_proxy_info_(used_proxy_info),
28 net_log_(net_log), 28 net_log_(net_log),
29 was_npn_negotiated_(was_npn_negotiated), 29 was_npn_negotiated_(was_npn_negotiated),
30 read_buf_(new GrowableIOBuffer()), 30 read_buf_(new GrowableIOBuffer()),
31 next_pipeline_id_(1), 31 next_pipeline_id_(1),
32 active_(false), 32 active_(false),
33 usable_(true), 33 usable_(true),
34 completed_one_request_(false), 34 completed_one_request_(false),
35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), 35 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
36 send_next_state_(SEND_STATE_NONE), 36 send_next_state_(SEND_STATE_NONE),
37 send_still_on_call_stack_(false),
37 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( 38 ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
38 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), 39 this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
39 send_user_callback_(NULL),
40 read_next_state_(READ_STATE_NONE), 40 read_next_state_(READ_STATE_NONE),
41 active_read_id_(0),
42 read_still_on_call_stack_(false),
41 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( 43 ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
42 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), 44 this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
43 read_user_callback_(NULL) {
44 CHECK(connection_.get()); 45 CHECK(connection_.get());
45 } 46 }
46 47
47 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { 48 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
48 CHECK_EQ(depth(), 0); 49 CHECK_EQ(depth(), 0);
49 CHECK(stream_info_map_.empty()); 50 CHECK(stream_info_map_.empty());
50 CHECK(deferred_request_queue_.empty()); 51 CHECK(pending_send_request_queue_.empty());
51 CHECK(request_order_.empty()); 52 CHECK(request_order_.empty());
52 CHECK_EQ(send_next_state_, SEND_STATE_NONE); 53 CHECK_EQ(send_next_state_, SEND_STATE_NONE);
53 CHECK_EQ(read_next_state_, READ_STATE_NONE); 54 CHECK_EQ(read_next_state_, READ_STATE_NONE);
54 CHECK(!send_user_callback_); 55 CHECK(!active_send_request_.get());
55 CHECK(!read_user_callback_); 56 CHECK(!active_read_id_);
56 if (!usable_) { 57 if (!usable_) {
57 connection_->socket()->Disconnect(); 58 connection_->socket()->Disconnect();
58 } 59 }
59 connection_->Reset(); 60 connection_->Reset();
60 } 61 }
61 62
62 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() { 63 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
63 int pipeline_id = next_pipeline_id_++; 64 int pipeline_id = next_pipeline_id_++;
64 CHECK(pipeline_id); 65 CHECK(pipeline_id);
65 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); 66 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 99 CHECK(ContainsKey(stream_info_map_, pipeline_id));
99 Close(pipeline_id, false); 100 Close(pipeline_id, false);
100 101
101 if (stream_info_map_[pipeline_id].state != STREAM_CREATED && 102 if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
102 stream_info_map_[pipeline_id].state != STREAM_UNUSED) { 103 stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
103 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 104 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
104 CHECK(stream_info_map_[pipeline_id].parser.get()); 105 CHECK(stream_info_map_[pipeline_id].parser.get());
105 stream_info_map_[pipeline_id].parser.reset(); 106 stream_info_map_[pipeline_id].parser.reset();
106 } 107 }
107 CHECK(!stream_info_map_[pipeline_id].parser.get()); 108 CHECK(!stream_info_map_[pipeline_id].parser.get());
108 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
109 stream_info_map_.erase(pipeline_id); 109 stream_info_map_.erase(pipeline_id);
110 110
111 delegate_->OnPipelineHasCapacity(this); 111 delegate_->OnPipelineHasCapacity(this);
112 } 112 }
113 113
114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, 114 int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
115 const std::string& request_line, 115 const std::string& request_line,
116 const HttpRequestHeaders& headers, 116 const HttpRequestHeaders& headers,
117 UploadDataStream* request_body, 117 UploadDataStream* request_body,
118 HttpResponseInfo* response, 118 HttpResponseInfo* response,
119 OldCompletionCallback* callback) { 119 OldCompletionCallback* callback) {
120 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 120 CHECK(ContainsKey(stream_info_map_, pipeline_id));
121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND); 121 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
122 if (!usable_) { 122 if (!usable_) {
123 return ERR_PIPELINE_EVICTION; 123 return ERR_PIPELINE_EVICTION;
124 } 124 }
125 125
126 DeferredSendRequest deferred_request; 126 PendingSendRequest* send_request = new PendingSendRequest;
127 deferred_request.pipeline_id = pipeline_id; 127 send_request->pipeline_id = pipeline_id;
128 deferred_request.request_line = request_line; 128 send_request->request_line = request_line;
129 deferred_request.headers = headers; 129 send_request->headers = headers;
130 deferred_request.request_body = request_body; 130 send_request->request_body = request_body;
131 deferred_request.response = response; 131 send_request->response = response;
132 deferred_request.callback = callback; 132 send_request->callback = callback;
133 deferred_request_queue_.push(deferred_request); 133 pending_send_request_queue_.push(send_request);
134 134
135 int rv; 135 int rv;
136 if (send_next_state_ == SEND_STATE_NONE) { 136 if (send_next_state_ == SEND_STATE_NONE) {
137 send_next_state_ = SEND_STATE_NEXT_REQUEST; 137 send_next_state_ = SEND_STATE_START_IMMEDIATELY;
138 rv = DoSendRequestLoop(OK); 138 rv = DoSendRequestLoop(OK);
139 } else { 139 } else {
140 rv = ERR_IO_PENDING; 140 rv = ERR_IO_PENDING;
141 } 141 }
142 ActivatePipeline(); 142 ActivatePipeline();
143 return rv; 143 return rv;
144 } 144 }
145 145
146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { 146 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
147 int rv = result; 147 int rv = result;
148 do { 148 do {
149 SendRequestState state = send_next_state_; 149 SendRequestState state = send_next_state_;
150 send_next_state_ = SEND_STATE_NONE; 150 send_next_state_ = SEND_STATE_NONE;
151 switch (state) { 151 switch (state) {
152 case SEND_STATE_NEXT_REQUEST: 152 case SEND_STATE_START_IMMEDIATELY:
153 rv = DoSendNextRequest(rv); 153 rv = DoStartRequestImmediately(rv);
154 break;
155 case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
156 rv = DoStartNextDeferredRequest(rv);
157 break;
158 case SEND_STATE_SEND_ACTIVE_REQUEST:
159 rv = DoSendActiveRequest(rv);
154 break; 160 break;
155 case SEND_STATE_COMPLETE: 161 case SEND_STATE_COMPLETE:
156 rv = DoSendComplete(rv); 162 rv = DoSendComplete(rv);
157 break; 163 break;
158 case SEND_STATE_UNUSABLE: 164 case SEND_STATE_EVICT_PENDING_REQUESTS:
159 rv = DoEvictPendingSendRequests(rv); 165 rv = DoEvictPendingSendRequests(rv);
160 break; 166 break;
161 default: 167 default:
162 NOTREACHED() << "bad send state: " << state; 168 NOTREACHED() << "bad send state: " << state;
163 rv = ERR_FAILED; 169 rv = ERR_FAILED;
164 break; 170 break;
165 } 171 }
166 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); 172 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
173 send_still_on_call_stack_ = false;
167 return rv; 174 return rv;
168 } 175 }
169 176
170 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { 177 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
171 CHECK(send_user_callback_); 178 CHECK(active_send_request_.get());
172 DoSendRequestLoop(result); 179 DoSendRequestLoop(result);
173 } 180 }
174 181
175 int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { 182 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
176 CHECK(!deferred_request_queue_.empty()); 183 CHECK(!active_send_request_.get());
177 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 184 CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
178 CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id)); 185 // If SendRequest() completes synchronously, then we need to return the value
179 if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) { 186 // directly to the caller. |send_still_on_call_stack_| will track this.
180 deferred_request_queue_.pop(); 187 // Otherwise, asynchronous completions will notify the caller via callback.
181 if (deferred_request_queue_.empty()) { 188 send_still_on_call_stack_ = true;
182 send_next_state_ = SEND_STATE_NONE; 189 active_send_request_.reset(pending_send_request_queue_.front());
183 } else { 190 pending_send_request_queue_.pop();
184 send_next_state_ = SEND_STATE_NEXT_REQUEST; 191 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
192 return OK;
193 }
194
195 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
196 CHECK(!send_still_on_call_stack_);
197 CHECK(!active_send_request_.get());
198
199 while (!pending_send_request_queue_.empty()) {
200 scoped_ptr<PendingSendRequest> next_request(
201 pending_send_request_queue_.front());
202 pending_send_request_queue_.pop();
203 CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
204 if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
205 active_send_request_.reset(next_request.release());
206 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
207 return OK;
185 } 208 }
186 return OK;
187 } 209 }
188 CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get()); 210
189 int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest( 211 send_next_state_ = SEND_STATE_NONE;
190 deferred_request.request_line, 212 return OK;
191 deferred_request.headers, 213 }
192 deferred_request.request_body, 214
193 deferred_request.response, 215 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
194 &send_io_callback_); 216 CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
195 // |result| == ERR_IO_PENDING means this function was *not* called on the same 217 int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
196 // stack as SendRequest(). That means we returned ERR_IO_PENDING to 218 SendRequest(active_send_request_->request_line,
197 // SendRequest() earlier and will need to invoke its callback. 219 active_send_request_->headers,
198 if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { 220 active_send_request_->request_body,
199 send_user_callback_ = deferred_request.callback; 221 active_send_request_->response,
200 } 222 &send_io_callback_);
201 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING; 223 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
202 send_next_state_ = SEND_STATE_COMPLETE; 224 send_next_state_ = SEND_STATE_COMPLETE;
203 return rv; 225 return rv;
204 } 226 }
205 227
206 int HttpPipelinedConnectionImpl::DoSendComplete(int result) { 228 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
207 CHECK(!deferred_request_queue_.empty()); 229 CHECK(active_send_request_.get());
208 const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); 230 CHECK_EQ(STREAM_SENDING,
209 CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state, 231 stream_info_map_[active_send_request_->pipeline_id].state);
210 STREAM_SENDING); 232
211 request_order_.push(deferred_request.pipeline_id); 233 request_order_.push(active_send_request_->pipeline_id);
212 stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT; 234 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
213 deferred_request_queue_.pop(); 235
214 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { 236 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
215 result = ERR_PIPELINE_EVICTION; 237 result = ERR_PIPELINE_EVICTION;
216 } 238 }
217 if (result < OK) { 239 if (result < OK) {
218 send_next_state_ = SEND_STATE_UNUSABLE;
219 usable_ = false; 240 usable_ = false;
220 } 241 }
221 if (send_user_callback_) { 242
222 MessageLoop::current()->PostTask( 243 if (!send_still_on_call_stack_) {
223 FROM_HERE, 244 QueueUserCallback(active_send_request_->pipeline_id,
224 method_factory_.NewRunnableMethod( 245 active_send_request_->callback, result, FROM_HERE);
225 &HttpPipelinedConnectionImpl::FireUserCallback,
226 deferred_request.pipeline_id,
227 result));
228 stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
229 send_user_callback_;
230 send_user_callback_ = NULL;
231 } 246 }
232 if (result < OK) { 247
233 return result; 248 active_send_request_.reset();
249
250 if (send_still_on_call_stack_) {
251 // It should be impossible for another request to appear on the queue while
252 // this send was on the call stack.
253 CHECK(pending_send_request_queue_.empty());
254 send_next_state_ = SEND_STATE_NONE;
255 } else if (!usable_) {
256 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
257 } else {
258 send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
234 } 259 }
235 if (deferred_request_queue_.empty()) { 260
236 send_next_state_ = SEND_STATE_NONE; 261 return result;
237 return OK;
238 }
239 send_next_state_ = SEND_STATE_NEXT_REQUEST;
240 return OK;
241 } 262 }
242 263
243 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { 264 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
265 while (!pending_send_request_queue_.empty()) {
266 scoped_ptr<PendingSendRequest> evicted_send(
267 pending_send_request_queue_.front());
268 pending_send_request_queue_.pop();
269 if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
270 evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
271 }
272 }
244 send_next_state_ = SEND_STATE_NONE; 273 send_next_state_ = SEND_STATE_NONE;
245 while (!deferred_request_queue_.empty()) {
246 const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
247 if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
248 evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
249 }
250 deferred_request_queue_.pop();
251 }
252 return result; 274 return result;
253 } 275 }
254 276
255 int HttpPipelinedConnectionImpl::ReadResponseHeaders( 277 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
256 int pipeline_id, 278 int pipeline_id,
257 OldCompletionCallback* callback) { 279 OldCompletionCallback* callback) {
258 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 280 CHECK(ContainsKey(stream_info_map_, pipeline_id));
259 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT); 281 CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
260 CHECK(!stream_info_map_[pipeline_id].read_headers_callback); 282 CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
283
261 if (!usable_) { 284 if (!usable_) {
262 return ERR_PIPELINE_EVICTION; 285 return ERR_PIPELINE_EVICTION;
263 } 286 }
287
264 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; 288 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
265 stream_info_map_[pipeline_id].read_headers_callback = callback; 289 stream_info_map_[pipeline_id].read_headers_callback = callback;
290 if (read_next_state_ == READ_STATE_NONE &&
291 pipeline_id == request_order_.front()) {
292 read_next_state_ = READ_STATE_START_IMMEDIATELY;
293 return DoReadHeadersLoop(OK);
294 }
295 return ERR_IO_PENDING;
296 }
297
298 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
266 if (read_next_state_ == READ_STATE_NONE) { 299 if (read_next_state_ == READ_STATE_NONE) {
267 read_next_state_ = READ_STATE_NEXT_HEADERS; 300 read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
268 return DoReadHeadersLoop(OK); 301 DoReadHeadersLoop(OK);
269 } else {
270 return ERR_IO_PENDING;
271 } 302 }
272 } 303 }
273 304
274 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { 305 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
275 int rv = result; 306 int rv = result;
276 do { 307 do {
277 ReadHeadersState state = read_next_state_; 308 ReadHeadersState state = read_next_state_;
278 read_next_state_ = READ_STATE_NONE; 309 read_next_state_ = READ_STATE_NONE;
279 switch (state) { 310 switch (state) {
280 case READ_STATE_NEXT_HEADERS: 311 case READ_STATE_START_IMMEDIATELY:
281 rv = DoReadNextHeaders(rv); 312 rv = DoStartReadImmediately(rv);
282 break; 313 break;
283 case READ_STATE_COMPLETE: 314 case READ_STATE_START_NEXT_DEFERRED_READ:
315 rv = DoStartNextDeferredRead(rv);
316 break;
317 case READ_STATE_READ_HEADERS:
318 rv = DoReadHeaders(rv);
319 break;
320 case READ_STATE_READ_HEADERS_COMPLETE:
284 rv = DoReadHeadersComplete(rv); 321 rv = DoReadHeadersComplete(rv);
285 break; 322 break;
286 case READ_STATE_WAITING_FOR_CLOSE: 323 case READ_STATE_WAITING_FOR_CLOSE:
287 rv = DoReadWaitingForClose(rv); 324 // This is a holding state. We return instead of continuing to run hte
325 // loop. The state will advance when the stream calls Close().
326 rv = DoReadWaitForClose(rv);
327 read_still_on_call_stack_ = false;
288 return rv; 328 return rv;
289 case READ_STATE_STREAM_CLOSED: 329 case READ_STATE_STREAM_CLOSED:
290 rv = DoReadStreamClosed(); 330 rv = DoReadStreamClosed();
291 break; 331 break;
292 case READ_STATE_UNUSABLE: 332 case READ_STATE_EVICT_PENDING_READS:
293 rv = DoEvictPendingReadHeaders(rv); 333 rv = DoEvictPendingReadHeaders(rv);
294 break; 334 break;
295 case READ_STATE_NONE: 335 case READ_STATE_NONE:
296 break; 336 break;
297 default: 337 default:
298 NOTREACHED() << "bad read state"; 338 NOTREACHED() << "bad read state";
299 rv = ERR_FAILED; 339 rv = ERR_FAILED;
300 break; 340 break;
301 } 341 }
302 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); 342 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
343 read_still_on_call_stack_ = false;
303 return rv; 344 return rv;
304 } 345 }
305 346
306 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { 347 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
307 CHECK(read_user_callback_);
308 DoReadHeadersLoop(result); 348 DoReadHeadersLoop(result);
309 } 349 }
310 350
311 int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { 351 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
352 CHECK(!active_read_id_);
353 CHECK(!read_still_on_call_stack_);
312 CHECK(!request_order_.empty()); 354 CHECK(!request_order_.empty());
313 int pipeline_id = request_order_.front(); 355 // If ReadResponseHeaders() completes synchronously, then we need to return
314 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 356 // the value directly to the caller. |read_still_on_call_stack_| will track
315 if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) { 357 // this. Otherwise, asynchronous completions will notify the caller via
316 // Since nobody will read whatever data is on the pipeline associated with 358 // callback.
317 // this closed request, we must shut down the rest of the pipeline. 359 read_still_on_call_stack_ = true;
318 read_next_state_ = READ_STATE_UNUSABLE; 360 read_next_state_ = READ_STATE_READ_HEADERS;
361 active_read_id_ = request_order_.front();
362 request_order_.pop();
363 return OK;
364 }
365
366 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
367 CHECK(!active_read_id_);
368 CHECK(!read_still_on_call_stack_);
369
370 if (request_order_.empty()) {
371 read_next_state_ = READ_STATE_NONE;
319 return OK; 372 return OK;
320 } 373 }
321 if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
322 return ERR_IO_PENDING;
323 }
324 CHECK(stream_info_map_[pipeline_id].parser.get());
325 374
326 if (result == ERR_IO_PENDING) { 375 int next_id = request_order_.front();
327 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE); 376 CHECK(ContainsKey(stream_info_map_, next_id));
328 } else { 377 switch (stream_info_map_[next_id].state) {
329 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING); 378 case STREAM_READ_PENDING:
330 stream_info_map_[pipeline_id].state = STREAM_ACTIVE; 379 read_next_state_ = READ_STATE_READ_HEADERS;
380 active_read_id_ = next_id;
381 request_order_.pop();
382 break;
383
384 case STREAM_CLOSED:
385 // Since nobody will read whatever data is on the pipeline associated with
386 // this closed request, we must shut down the rest of the pipeline.
387 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
388 break;
389
390 case STREAM_SENT:
391 read_next_state_ = READ_STATE_NONE;
392 break;
393
394 default:
395 NOTREACHED() << "Unexpected read state: "
396 << stream_info_map_[next_id].state;
331 } 397 }
332 398
333 int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders( 399 return OK;
400 }
401
402 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
403 CHECK(active_read_id_);
404 CHECK(ContainsKey(stream_info_map_, active_read_id_));
405 CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
406 stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
407 int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
334 &read_io_callback_); 408 &read_io_callback_);
335 if (rv == ERR_IO_PENDING) { 409 read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
336 read_next_state_ = READ_STATE_COMPLETE;
337 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
338 } else if (rv < OK) {
339 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
340 if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
341 rv = ERR_PIPELINE_EVICTION;
342 } else {
343 CHECK_LE(OK, rv);
344 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
345 }
346
347 // |result| == ERR_IO_PENDING means this function was *not* called on the same
348 // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
349 // ReadResponseHeaders() earlier and now need to invoke its callback.
350 if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
351 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
352 read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
353 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
354 MessageLoop::current()->PostTask(
355 FROM_HERE,
356 method_factory_.NewRunnableMethod(
357 &HttpPipelinedConnectionImpl::FireUserCallback,
358 pipeline_id,
359 rv));
360 }
361 return rv; 410 return rv;
362 } 411 }
363 412
364 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { 413 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
414 CHECK(active_read_id_);
415 CHECK(ContainsKey(stream_info_map_, active_read_id_));
416 CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
417
365 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 418 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
366 if (read_user_callback_) { 419 if (result < OK) {
367 int pipeline_id = request_order_.front(); 420 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
368 MessageLoop::current()->PostTask( 421 result = ERR_PIPELINE_EVICTION;
369 FROM_HERE, 422 }
370 method_factory_.NewRunnableMethod( 423 usable_ = false;
371 &HttpPipelinedConnectionImpl::FireUserCallback,
372 pipeline_id,
373 result));
374 stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
375 read_user_callback_ = NULL;
376 } 424 }
425
426 if (!read_still_on_call_stack_) {
427 QueueUserCallback(active_read_id_,
428 stream_info_map_[active_read_id_].read_headers_callback,
429 result, FROM_HERE);
430 }
431
377 return result; 432 return result;
378 } 433 }
379 434
380 int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { 435 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
381 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 436 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
382 return result; 437 return result;
383 } 438 }
384 439
385 int HttpPipelinedConnectionImpl::DoReadStreamClosed() { 440 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
386 CHECK(!request_order_.empty()); 441 CHECK(active_read_id_);
387 int pipeline_id = request_order_.front(); 442 CHECK(ContainsKey(stream_info_map_, active_read_id_));
388 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 443 CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
389 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 444 active_read_id_ = 0;
390 CHECK(stream_info_map_[pipeline_id].read_headers_callback);
391 stream_info_map_[pipeline_id].read_headers_callback = NULL;
392 request_order_.pop();
393 if (!usable_) { 445 if (!usable_) {
394 read_next_state_ = READ_STATE_UNUSABLE; 446 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
395 return OK;
396 } else {
397 completed_one_request_ = true;
398 if (!request_order_.empty()) {
399 int next_pipeline_id = request_order_.front();
400 CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
401 if (stream_info_map_[next_pipeline_id].read_headers_callback) {
402 stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
403 read_next_state_ = READ_STATE_NEXT_HEADERS;
404 MessageLoop::current()->PostTask(
405 FROM_HERE,
406 method_factory_.NewRunnableMethod(
407 &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
408 ERR_IO_PENDING));
409 return ERR_IO_PENDING; // Wait for the task to fire.
410 }
411 }
412 read_next_state_ = READ_STATE_NONE;
413 return OK; 447 return OK;
414 } 448 }
449 completed_one_request_ = true;
450 MessageLoop::current()->PostTask(
451 FROM_HERE,
452 method_factory_.NewRunnableMethod(
453 &HttpPipelinedConnectionImpl::StartNextDeferredRead));
454 read_next_state_ = READ_STATE_NONE;
455 return OK;
415 } 456 }
416 457
417 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { 458 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
418 while (!request_order_.empty()) { 459 while (!request_order_.empty()) {
419 int evicted_id = request_order_.front(); 460 int evicted_id = request_order_.front();
420 request_order_.pop(); 461 request_order_.pop();
421 if (!ContainsKey(stream_info_map_, evicted_id) || 462 if (!ContainsKey(stream_info_map_, evicted_id)) {
422 (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
423 continue; 463 continue;
424 } 464 }
425 if (stream_info_map_[evicted_id].state != STREAM_CLOSED) { 465 if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
426 stream_info_map_[evicted_id].pending_user_callback = 466 stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
427 stream_info_map_[evicted_id].read_headers_callback; 467 QueueUserCallback(evicted_id,
428 MessageLoop::current()->PostTask( 468 stream_info_map_[evicted_id].read_headers_callback,
429 FROM_HERE, 469 ERR_PIPELINE_EVICTION,
430 method_factory_.NewRunnableMethod( 470 FROM_HERE);
431 &HttpPipelinedConnectionImpl::FireUserCallback,
432 evicted_id,
433 ERR_PIPELINE_EVICTION));
434 } 471 }
435 stream_info_map_[evicted_id].read_headers_callback = NULL;
436 } 472 }
437 read_next_state_ = READ_STATE_NONE; 473 read_next_state_ = READ_STATE_NONE;
438 return result; 474 return result;
439 } 475 }
440 476
441 void HttpPipelinedConnectionImpl::Close(int pipeline_id, 477 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
442 bool not_reusable) { 478 bool not_reusable) {
443 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 479 CHECK(ContainsKey(stream_info_map_, pipeline_id));
444 switch (stream_info_map_[pipeline_id].state) { 480 switch (stream_info_map_[pipeline_id].state) {
445 case STREAM_CREATED: 481 case STREAM_CREATED:
446 stream_info_map_[pipeline_id].state = STREAM_UNUSED; 482 stream_info_map_[pipeline_id].state = STREAM_UNUSED;
447 break; 483 break;
448 484
449 case STREAM_BOUND: 485 case STREAM_BOUND:
450 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 486 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
451 break; 487 break;
452 488
453 case STREAM_SENDING: 489 case STREAM_SENDING:
454 usable_ = false; 490 usable_ = false;
455 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 491 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
456 send_user_callback_ = NULL; 492 active_send_request_.reset();
457 send_next_state_ = SEND_STATE_UNUSABLE; 493 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
458 DoSendRequestLoop(OK); 494 DoSendRequestLoop(OK);
459 break; 495 break;
460 496
461 case STREAM_SENT: 497 case STREAM_SENT:
462 case STREAM_READ_PENDING: 498 case STREAM_READ_PENDING:
463 usable_ = false; 499 usable_ = false;
464 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 500 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
465 stream_info_map_[pipeline_id].read_headers_callback = NULL; 501 if (!request_order_.empty() &&
466 if (read_next_state_ == READ_STATE_NONE) { 502 pipeline_id == request_order_.front() &&
467 read_next_state_ = READ_STATE_UNUSABLE; 503 read_next_state_ == READ_STATE_NONE) {
504 read_next_state_ = READ_STATE_EVICT_PENDING_READS;
468 DoReadHeadersLoop(OK); 505 DoReadHeadersLoop(OK);
469 } 506 }
470 break; 507 break;
471 508
472 case STREAM_ACTIVE: 509 case STREAM_ACTIVE:
473 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 510 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
474 if (not_reusable) { 511 if (not_reusable) {
475 usable_ = false; 512 usable_ = false;
476 } 513 }
477 read_next_state_ = READ_STATE_STREAM_CLOSED; 514 read_next_state_ = READ_STATE_STREAM_CLOSED;
478 read_user_callback_ = NULL;
479 DoReadHeadersLoop(OK); 515 DoReadHeadersLoop(OK);
480 break; 516 break;
481 517
518 case STREAM_READ_EVICTED:
519 stream_info_map_[pipeline_id].state = STREAM_CLOSED;
520 break;
521
482 case STREAM_CLOSED: 522 case STREAM_CLOSED:
483 case STREAM_UNUSED: 523 case STREAM_UNUSED:
484 // TODO(simonjam): Why is Close() sometimes called twice? 524 // TODO(simonjam): Why is Close() sometimes called twice?
485 break; 525 break;
486 526
487 default: 527 default:
488 NOTREACHED(); 528 NOTREACHED();
489 break; 529 break;
490 } 530 }
491 } 531 }
492 532
493 int HttpPipelinedConnectionImpl::ReadResponseBody( 533 int HttpPipelinedConnectionImpl::ReadResponseBody(
494 int pipeline_id, 534 int pipeline_id,
495 IOBuffer* buf, 535 IOBuffer* buf,
496 int buf_len, 536 int buf_len,
497 OldCompletionCallback* callback) { 537 OldCompletionCallback* callback) {
498 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 538 CHECK(ContainsKey(stream_info_map_, pipeline_id));
499 CHECK(!request_order_.empty()); 539 CHECK_EQ(active_read_id_, pipeline_id);
500 CHECK_EQ(pipeline_id, request_order_.front());
501 CHECK(stream_info_map_[pipeline_id].parser.get()); 540 CHECK(stream_info_map_[pipeline_id].parser.get());
502 return stream_info_map_[pipeline_id].parser->ReadResponseBody( 541 return stream_info_map_[pipeline_id].parser->ReadResponseBody(
503 buf, buf_len, callback); 542 buf, buf_len, callback);
504 } 543 }
505 544
506 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const { 545 uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const {
507 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 546 CHECK(ContainsKey(stream_info_map_, pipeline_id));
508 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 547 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
509 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress(); 548 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
510 } 549 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 599
561 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( 600 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
562 int pipeline_id, 601 int pipeline_id,
563 SSLCertRequestInfo* cert_request_info) { 602 SSLCertRequestInfo* cert_request_info) {
564 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 603 CHECK(ContainsKey(stream_info_map_, pipeline_id));
565 CHECK(stream_info_map_[pipeline_id].parser.get()); 604 CHECK(stream_info_map_[pipeline_id].parser.get());
566 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo( 605 return stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
567 cert_request_info); 606 cert_request_info);
568 } 607 }
569 608
609 void HttpPipelinedConnectionImpl::QueueUserCallback(
610 int pipeline_id,
611 OldCompletionCallback* callback,
612 int rv,
613 const tracked_objects::Location& from_here) {
614 CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
615 stream_info_map_[pipeline_id].pending_user_callback = callback;
616 MessageLoop::current()->PostTask(
617 from_here,
618 method_factory_.NewRunnableMethod(
619 &HttpPipelinedConnectionImpl::FireUserCallback,
620 pipeline_id,
621 rv));
622 }
623
570 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id, 624 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
571 int result) { 625 int result) {
572 if (ContainsKey(stream_info_map_, pipeline_id)) { 626 if (ContainsKey(stream_info_map_, pipeline_id)) {
573 stream_info_map_[pipeline_id].pending_user_callback->Run(result); 627 CHECK(stream_info_map_[pipeline_id].pending_user_callback);
628 OldCompletionCallback* callback =
629 stream_info_map_[pipeline_id].pending_user_callback;
630 stream_info_map_[pipeline_id].pending_user_callback = NULL;
631 callback->Run(result);
574 } 632 }
575 } 633 }
576 634
577 int HttpPipelinedConnectionImpl::depth() const { 635 int HttpPipelinedConnectionImpl::depth() const {
578 return stream_info_map_.size(); 636 return stream_info_map_.size();
579 } 637 }
580 638
581 bool HttpPipelinedConnectionImpl::usable() const { 639 bool HttpPipelinedConnectionImpl::usable() const {
582 return usable_; 640 return usable_;
583 } 641 }
(...skipping 11 matching lines...) Expand all
595 } 653 }
596 654
597 const NetLog::Source& HttpPipelinedConnectionImpl::source() const { 655 const NetLog::Source& HttpPipelinedConnectionImpl::source() const {
598 return net_log_.source(); 656 return net_log_.source();
599 } 657 }
600 658
601 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { 659 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
602 return was_npn_negotiated_; 660 return was_npn_negotiated_;
603 } 661 }
604 662
605 HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() { 663 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
606 } 664 }
607 665
608 HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() { 666 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
609 } 667 }
610 668
611 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() 669 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
612 : read_headers_callback(NULL), 670 : read_headers_callback(NULL),
671 pending_user_callback(NULL),
613 state(STREAM_CREATED) { 672 state(STREAM_CREATED) {
614 } 673 }
615 674
616 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() { 675 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
617 } 676 }
618 677
619 } // namespace net 678 } // namespace net
OLDNEW
« no previous file with comments | « net/http/http_pipelined_connection_impl.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698