| Index: net/http/http_pipelined_connection_impl.cc
|
| diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc
|
| index aa3d1788d50432829b8843587eefe52805d42ef7..82c788f903edbaf88a8861f94bb81e3ea8dcd06d 100644
|
| --- a/net/http/http_pipelined_connection_impl.cc
|
| +++ b/net/http/http_pipelined_connection_impl.cc
|
| @@ -34,25 +34,26 @@ HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
|
| completed_one_request_(false),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
|
| send_next_state_(SEND_STATE_NONE),
|
| + send_still_on_call_stack_(false),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_(
|
| this, &HttpPipelinedConnectionImpl::OnSendIOCallback)),
|
| - send_user_callback_(NULL),
|
| read_next_state_(READ_STATE_NONE),
|
| + active_read_id_(0),
|
| + read_still_on_call_stack_(false),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_(
|
| - this, &HttpPipelinedConnectionImpl::OnReadIOCallback)),
|
| - read_user_callback_(NULL) {
|
| + this, &HttpPipelinedConnectionImpl::OnReadIOCallback)) {
|
| CHECK(connection_.get());
|
| }
|
|
|
| HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
|
| CHECK_EQ(depth(), 0);
|
| CHECK(stream_info_map_.empty());
|
| - CHECK(deferred_request_queue_.empty());
|
| + CHECK(pending_send_request_queue_.empty());
|
| CHECK(request_order_.empty());
|
| CHECK_EQ(send_next_state_, SEND_STATE_NONE);
|
| CHECK_EQ(read_next_state_, READ_STATE_NONE);
|
| - CHECK(!send_user_callback_);
|
| - CHECK(!read_user_callback_);
|
| + CHECK(!active_send_request_.get());
|
| + CHECK(!active_read_id_);
|
| if (!usable_) {
|
| connection_->socket()->Disconnect();
|
| }
|
| @@ -105,7 +106,6 @@ void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
|
| stream_info_map_[pipeline_id].parser.reset();
|
| }
|
| CHECK(!stream_info_map_[pipeline_id].parser.get());
|
| - CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
|
| stream_info_map_.erase(pipeline_id);
|
|
|
| delegate_->OnPipelineHasCapacity(this);
|
| @@ -123,18 +123,18 @@ int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id,
|
| return ERR_PIPELINE_EVICTION;
|
| }
|
|
|
| - DeferredSendRequest deferred_request;
|
| - deferred_request.pipeline_id = pipeline_id;
|
| - deferred_request.request_line = request_line;
|
| - deferred_request.headers = headers;
|
| - deferred_request.request_body = request_body;
|
| - deferred_request.response = response;
|
| - deferred_request.callback = callback;
|
| - deferred_request_queue_.push(deferred_request);
|
| + PendingSendRequest* send_request = new PendingSendRequest;
|
| + send_request->pipeline_id = pipeline_id;
|
| + send_request->request_line = request_line;
|
| + send_request->headers = headers;
|
| + send_request->request_body = request_body;
|
| + send_request->response = response;
|
| + send_request->callback = callback;
|
| + pending_send_request_queue_.push(send_request);
|
|
|
| int rv;
|
| if (send_next_state_ == SEND_STATE_NONE) {
|
| - send_next_state_ = SEND_STATE_NEXT_REQUEST;
|
| + send_next_state_ = SEND_STATE_START_IMMEDIATELY;
|
| rv = DoSendRequestLoop(OK);
|
| } else {
|
| rv = ERR_IO_PENDING;
|
| @@ -149,13 +149,19 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
|
| SendRequestState state = send_next_state_;
|
| send_next_state_ = SEND_STATE_NONE;
|
| switch (state) {
|
| - case SEND_STATE_NEXT_REQUEST:
|
| - rv = DoSendNextRequest(rv);
|
| + case SEND_STATE_START_IMMEDIATELY:
|
| + rv = DoStartRequestImmediately(rv);
|
| + break;
|
| + case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
|
| + rv = DoStartNextDeferredRequest(rv);
|
| + break;
|
| + case SEND_STATE_SEND_ACTIVE_REQUEST:
|
| + rv = DoSendActiveRequest(rv);
|
| break;
|
| case SEND_STATE_COMPLETE:
|
| rv = DoSendComplete(rv);
|
| break;
|
| - case SEND_STATE_UNUSABLE:
|
| + case SEND_STATE_EVICT_PENDING_REQUESTS:
|
| rv = DoEvictPendingSendRequests(rv);
|
| break;
|
| default:
|
| @@ -164,91 +170,107 @@ int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
|
| break;
|
| }
|
| } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
|
| + send_still_on_call_stack_ = false;
|
| return rv;
|
| }
|
|
|
| void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
|
| - CHECK(send_user_callback_);
|
| + CHECK(active_send_request_.get());
|
| DoSendRequestLoop(result);
|
| }
|
|
|
| -int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) {
|
| - CHECK(!deferred_request_queue_.empty());
|
| - const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
|
| - CHECK(ContainsKey(stream_info_map_, deferred_request.pipeline_id));
|
| - if (stream_info_map_[deferred_request.pipeline_id].state == STREAM_CLOSED) {
|
| - deferred_request_queue_.pop();
|
| - if (deferred_request_queue_.empty()) {
|
| - send_next_state_ = SEND_STATE_NONE;
|
| - } else {
|
| - send_next_state_ = SEND_STATE_NEXT_REQUEST;
|
| +int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
|
| + CHECK(!active_send_request_.get());
|
| + CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
|
| + // If SendRequest() completes synchronously, then we need to return the value
|
| + // directly to the caller. |send_still_on_call_stack_| will track this.
|
| + // Otherwise, asynchronous completions will notify the caller via callback.
|
| + send_still_on_call_stack_ = true;
|
| + active_send_request_.reset(pending_send_request_queue_.front());
|
| + pending_send_request_queue_.pop();
|
| + send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
|
| + return OK;
|
| +}
|
| +
|
| +int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
|
| + CHECK(!send_still_on_call_stack_);
|
| + CHECK(!active_send_request_.get());
|
| +
|
| + while (!pending_send_request_queue_.empty()) {
|
| + scoped_ptr<PendingSendRequest> next_request(
|
| + pending_send_request_queue_.front());
|
| + pending_send_request_queue_.pop();
|
| + CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
|
| + if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
|
| + active_send_request_.reset(next_request.release());
|
| + send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
|
| + return OK;
|
| }
|
| - return OK;
|
| - }
|
| - CHECK(stream_info_map_[deferred_request.pipeline_id].parser.get());
|
| - int rv = stream_info_map_[deferred_request.pipeline_id].parser->SendRequest(
|
| - deferred_request.request_line,
|
| - deferred_request.headers,
|
| - deferred_request.request_body,
|
| - deferred_request.response,
|
| - &send_io_callback_);
|
| - // |result| == ERR_IO_PENDING means this function was *not* called on the same
|
| - // stack as SendRequest(). That means we returned ERR_IO_PENDING to
|
| - // SendRequest() earlier and will need to invoke its callback.
|
| - if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) {
|
| - send_user_callback_ = deferred_request.callback;
|
| }
|
| - stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENDING;
|
| +
|
| + send_next_state_ = SEND_STATE_NONE;
|
| + return OK;
|
| +}
|
| +
|
| +int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
|
| + CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
|
| + int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
|
| + SendRequest(active_send_request_->request_line,
|
| + active_send_request_->headers,
|
| + active_send_request_->request_body,
|
| + active_send_request_->response,
|
| + &send_io_callback_);
|
| + stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
|
| send_next_state_ = SEND_STATE_COMPLETE;
|
| return rv;
|
| }
|
|
|
| int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
|
| - CHECK(!deferred_request_queue_.empty());
|
| - const DeferredSendRequest& deferred_request = deferred_request_queue_.front();
|
| - CHECK_EQ(stream_info_map_[deferred_request.pipeline_id].state,
|
| - STREAM_SENDING);
|
| - request_order_.push(deferred_request.pipeline_id);
|
| - stream_info_map_[deferred_request.pipeline_id].state = STREAM_SENT;
|
| - deferred_request_queue_.pop();
|
| + CHECK(active_send_request_.get());
|
| + CHECK_EQ(STREAM_SENDING,
|
| + stream_info_map_[active_send_request_->pipeline_id].state);
|
| +
|
| + request_order_.push(active_send_request_->pipeline_id);
|
| + stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
|
| +
|
| if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
|
| result = ERR_PIPELINE_EVICTION;
|
| }
|
| if (result < OK) {
|
| - send_next_state_ = SEND_STATE_UNUSABLE;
|
| usable_ = false;
|
| }
|
| - if (send_user_callback_) {
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - method_factory_.NewRunnableMethod(
|
| - &HttpPipelinedConnectionImpl::FireUserCallback,
|
| - deferred_request.pipeline_id,
|
| - result));
|
| - stream_info_map_[deferred_request.pipeline_id].pending_user_callback =
|
| - send_user_callback_;
|
| - send_user_callback_ = NULL;
|
| - }
|
| - if (result < OK) {
|
| - return result;
|
| +
|
| + if (!send_still_on_call_stack_) {
|
| + QueueUserCallback(active_send_request_->pipeline_id,
|
| + active_send_request_->callback, result, FROM_HERE);
|
| }
|
| - if (deferred_request_queue_.empty()) {
|
| +
|
| + active_send_request_.reset();
|
| +
|
| + if (send_still_on_call_stack_) {
|
| + // It should be impossible for another request to appear on the queue while
|
| + // this send was on the call stack.
|
| + CHECK(pending_send_request_queue_.empty());
|
| send_next_state_ = SEND_STATE_NONE;
|
| - return OK;
|
| + } else if (!usable_) {
|
| + send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
|
| + } else {
|
| + send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
|
| }
|
| - send_next_state_ = SEND_STATE_NEXT_REQUEST;
|
| - return OK;
|
| +
|
| + return result;
|
| }
|
|
|
| int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
|
| - send_next_state_ = SEND_STATE_NONE;
|
| - while (!deferred_request_queue_.empty()) {
|
| - const DeferredSendRequest& evicted_send = deferred_request_queue_.front();
|
| - if (stream_info_map_[evicted_send.pipeline_id].state != STREAM_CLOSED) {
|
| - evicted_send.callback->Run(ERR_PIPELINE_EVICTION);
|
| + while (!pending_send_request_queue_.empty()) {
|
| + scoped_ptr<PendingSendRequest> evicted_send(
|
| + pending_send_request_queue_.front());
|
| + pending_send_request_queue_.pop();
|
| + if (stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
|
| + evicted_send->callback->Run(ERR_PIPELINE_EVICTION);
|
| }
|
| - deferred_request_queue_.pop();
|
| }
|
| + send_next_state_ = SEND_STATE_NONE;
|
| return result;
|
| }
|
|
|
| @@ -256,18 +278,27 @@ int HttpPipelinedConnectionImpl::ReadResponseHeaders(
|
| int pipeline_id,
|
| OldCompletionCallback* callback) {
|
| CHECK(ContainsKey(stream_info_map_, pipeline_id));
|
| - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_SENT);
|
| + CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
|
| CHECK(!stream_info_map_[pipeline_id].read_headers_callback);
|
| +
|
| if (!usable_) {
|
| return ERR_PIPELINE_EVICTION;
|
| }
|
| +
|
| stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
|
| stream_info_map_[pipeline_id].read_headers_callback = callback;
|
| - if (read_next_state_ == READ_STATE_NONE) {
|
| - read_next_state_ = READ_STATE_NEXT_HEADERS;
|
| + if (read_next_state_ == READ_STATE_NONE &&
|
| + pipeline_id == request_order_.front()) {
|
| + read_next_state_ = READ_STATE_START_IMMEDIATELY;
|
| return DoReadHeadersLoop(OK);
|
| - } else {
|
| - return ERR_IO_PENDING;
|
| + }
|
| + return ERR_IO_PENDING;
|
| +}
|
| +
|
| +void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
|
| + if (read_next_state_ == READ_STATE_NONE) {
|
| + read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
|
| + DoReadHeadersLoop(OK);
|
| }
|
| }
|
|
|
| @@ -277,19 +308,28 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
|
| ReadHeadersState state = read_next_state_;
|
| read_next_state_ = READ_STATE_NONE;
|
| switch (state) {
|
| - case READ_STATE_NEXT_HEADERS:
|
| - rv = DoReadNextHeaders(rv);
|
| + case READ_STATE_START_IMMEDIATELY:
|
| + rv = DoStartReadImmediately(rv);
|
| + break;
|
| + case READ_STATE_START_NEXT_DEFERRED_READ:
|
| + rv = DoStartNextDeferredRead(rv);
|
| break;
|
| - case READ_STATE_COMPLETE:
|
| + case READ_STATE_READ_HEADERS:
|
| + rv = DoReadHeaders(rv);
|
| + break;
|
| + case READ_STATE_READ_HEADERS_COMPLETE:
|
| rv = DoReadHeadersComplete(rv);
|
| break;
|
| case READ_STATE_WAITING_FOR_CLOSE:
|
| - rv = DoReadWaitingForClose(rv);
|
| + // This is a holding state. We return instead of continuing to run hte
|
| + // loop. The state will advance when the stream calls Close().
|
| + rv = DoReadWaitForClose(rv);
|
| + read_still_on_call_stack_ = false;
|
| return rv;
|
| case READ_STATE_STREAM_CLOSED:
|
| rv = DoReadStreamClosed();
|
| break;
|
| - case READ_STATE_UNUSABLE:
|
| + case READ_STATE_EVICT_PENDING_READS:
|
| rv = DoEvictPendingReadHeaders(rv);
|
| break;
|
| case READ_STATE_NONE:
|
| @@ -300,139 +340,135 @@ int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
|
| break;
|
| }
|
| } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
|
| + read_still_on_call_stack_ = false;
|
| return rv;
|
| }
|
|
|
| void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
|
| - CHECK(read_user_callback_);
|
| DoReadHeadersLoop(result);
|
| }
|
|
|
| -int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) {
|
| +int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
|
| + CHECK(!active_read_id_);
|
| + CHECK(!read_still_on_call_stack_);
|
| CHECK(!request_order_.empty());
|
| - int pipeline_id = request_order_.front();
|
| - CHECK(ContainsKey(stream_info_map_, pipeline_id));
|
| - if (stream_info_map_[pipeline_id].state == STREAM_CLOSED) {
|
| - // Since nobody will read whatever data is on the pipeline associated with
|
| - // this closed request, we must shut down the rest of the pipeline.
|
| - read_next_state_ = READ_STATE_UNUSABLE;
|
| + // If ReadResponseHeaders() completes synchronously, then we need to return
|
| + // the value directly to the caller. |read_still_on_call_stack_| will track
|
| + // this. Otherwise, asynchronous completions will notify the caller via
|
| + // callback.
|
| + read_still_on_call_stack_ = true;
|
| + read_next_state_ = READ_STATE_READ_HEADERS;
|
| + active_read_id_ = request_order_.front();
|
| + request_order_.pop();
|
| + return OK;
|
| +}
|
| +
|
| +int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
|
| + CHECK(!active_read_id_);
|
| + CHECK(!read_still_on_call_stack_);
|
| +
|
| + if (request_order_.empty()) {
|
| + read_next_state_ = READ_STATE_NONE;
|
| return OK;
|
| }
|
| - if (stream_info_map_[pipeline_id].read_headers_callback == NULL) {
|
| - return ERR_IO_PENDING;
|
| - }
|
| - CHECK(stream_info_map_[pipeline_id].parser.get());
|
|
|
| - if (result == ERR_IO_PENDING) {
|
| - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_ACTIVE);
|
| - } else {
|
| - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_READ_PENDING);
|
| - stream_info_map_[pipeline_id].state = STREAM_ACTIVE;
|
| - }
|
| + int next_id = request_order_.front();
|
| + CHECK(ContainsKey(stream_info_map_, next_id));
|
| + switch (stream_info_map_[next_id].state) {
|
| + case STREAM_READ_PENDING:
|
| + read_next_state_ = READ_STATE_READ_HEADERS;
|
| + active_read_id_ = next_id;
|
| + request_order_.pop();
|
| + break;
|
|
|
| - int rv = stream_info_map_[pipeline_id].parser->ReadResponseHeaders(
|
| - &read_io_callback_);
|
| - if (rv == ERR_IO_PENDING) {
|
| - read_next_state_ = READ_STATE_COMPLETE;
|
| - read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
|
| - } else if (rv < OK) {
|
| - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
|
| - if (rv == ERR_SOCKET_NOT_CONNECTED && completed_one_request_)
|
| - rv = ERR_PIPELINE_EVICTION;
|
| - } else {
|
| - CHECK_LE(OK, rv);
|
| - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
|
| - }
|
| + case STREAM_CLOSED:
|
| + // Since nobody will read whatever data is on the pipeline associated with
|
| + // this closed request, we must shut down the rest of the pipeline.
|
| + read_next_state_ = READ_STATE_EVICT_PENDING_READS;
|
| + break;
|
|
|
| - // |result| == ERR_IO_PENDING means this function was *not* called on the same
|
| - // stack as ReadResponseHeaders(). That means we returned ERR_IO_PENDING to
|
| - // ReadResponseHeaders() earlier and now need to invoke its callback.
|
| - if (rv != ERR_IO_PENDING && result == ERR_IO_PENDING) {
|
| - read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
|
| - read_user_callback_ = stream_info_map_[pipeline_id].read_headers_callback;
|
| - stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - method_factory_.NewRunnableMethod(
|
| - &HttpPipelinedConnectionImpl::FireUserCallback,
|
| - pipeline_id,
|
| - rv));
|
| + case STREAM_SENT:
|
| + read_next_state_ = READ_STATE_NONE;
|
| + break;
|
| +
|
| + default:
|
| + NOTREACHED() << "Unexpected read state: "
|
| + << stream_info_map_[next_id].state;
|
| }
|
| +
|
| + return OK;
|
| +}
|
| +
|
| +int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
|
| + CHECK(active_read_id_);
|
| + CHECK(ContainsKey(stream_info_map_, active_read_id_));
|
| + CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
|
| + stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
|
| + int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
|
| + &read_io_callback_);
|
| + read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
|
| return rv;
|
| }
|
|
|
| int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
|
| + CHECK(active_read_id_);
|
| + CHECK(ContainsKey(stream_info_map_, active_read_id_));
|
| + CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
|
| +
|
| read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
|
| - if (read_user_callback_) {
|
| - int pipeline_id = request_order_.front();
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - method_factory_.NewRunnableMethod(
|
| - &HttpPipelinedConnectionImpl::FireUserCallback,
|
| - pipeline_id,
|
| - result));
|
| - stream_info_map_[pipeline_id].pending_user_callback = read_user_callback_;
|
| - read_user_callback_ = NULL;
|
| + if (result < OK) {
|
| + if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
|
| + result = ERR_PIPELINE_EVICTION;
|
| + }
|
| + usable_ = false;
|
| + }
|
| +
|
| + if (!read_still_on_call_stack_) {
|
| + QueueUserCallback(active_read_id_,
|
| + stream_info_map_[active_read_id_].read_headers_callback,
|
| + result, FROM_HERE);
|
| }
|
| +
|
| return result;
|
| }
|
|
|
| -int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) {
|
| +int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
|
| read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
|
| return result;
|
| }
|
|
|
| int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
|
| - CHECK(!request_order_.empty());
|
| - int pipeline_id = request_order_.front();
|
| - CHECK(ContainsKey(stream_info_map_, pipeline_id));
|
| - CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
|
| - CHECK(stream_info_map_[pipeline_id].read_headers_callback);
|
| - stream_info_map_[pipeline_id].read_headers_callback = NULL;
|
| - request_order_.pop();
|
| + CHECK(active_read_id_);
|
| + CHECK(ContainsKey(stream_info_map_, active_read_id_));
|
| + CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
|
| + active_read_id_ = 0;
|
| if (!usable_) {
|
| - read_next_state_ = READ_STATE_UNUSABLE;
|
| - return OK;
|
| - } else {
|
| - completed_one_request_ = true;
|
| - if (!request_order_.empty()) {
|
| - int next_pipeline_id = request_order_.front();
|
| - CHECK(ContainsKey(stream_info_map_, next_pipeline_id));
|
| - if (stream_info_map_[next_pipeline_id].read_headers_callback) {
|
| - stream_info_map_[next_pipeline_id].state = STREAM_ACTIVE;
|
| - read_next_state_ = READ_STATE_NEXT_HEADERS;
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - method_factory_.NewRunnableMethod(
|
| - &HttpPipelinedConnectionImpl::DoReadHeadersLoop,
|
| - ERR_IO_PENDING));
|
| - return ERR_IO_PENDING; // Wait for the task to fire.
|
| - }
|
| - }
|
| - read_next_state_ = READ_STATE_NONE;
|
| + read_next_state_ = READ_STATE_EVICT_PENDING_READS;
|
| return OK;
|
| }
|
| + completed_one_request_ = true;
|
| + MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + method_factory_.NewRunnableMethod(
|
| + &HttpPipelinedConnectionImpl::StartNextDeferredRead));
|
| + read_next_state_ = READ_STATE_NONE;
|
| + return OK;
|
| }
|
|
|
| int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
|
| while (!request_order_.empty()) {
|
| int evicted_id = request_order_.front();
|
| request_order_.pop();
|
| - if (!ContainsKey(stream_info_map_, evicted_id) ||
|
| - (stream_info_map_[evicted_id].read_headers_callback == NULL)) {
|
| + if (!ContainsKey(stream_info_map_, evicted_id)) {
|
| continue;
|
| }
|
| - if (stream_info_map_[evicted_id].state != STREAM_CLOSED) {
|
| - stream_info_map_[evicted_id].pending_user_callback =
|
| - stream_info_map_[evicted_id].read_headers_callback;
|
| - MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - method_factory_.NewRunnableMethod(
|
| - &HttpPipelinedConnectionImpl::FireUserCallback,
|
| - evicted_id,
|
| - ERR_PIPELINE_EVICTION));
|
| + if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
|
| + stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
|
| + QueueUserCallback(evicted_id,
|
| + stream_info_map_[evicted_id].read_headers_callback,
|
| + ERR_PIPELINE_EVICTION,
|
| + FROM_HERE);
|
| }
|
| - stream_info_map_[evicted_id].read_headers_callback = NULL;
|
| }
|
| read_next_state_ = READ_STATE_NONE;
|
| return result;
|
| @@ -453,8 +489,8 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
|
| case STREAM_SENDING:
|
| usable_ = false;
|
| stream_info_map_[pipeline_id].state = STREAM_CLOSED;
|
| - send_user_callback_ = NULL;
|
| - send_next_state_ = SEND_STATE_UNUSABLE;
|
| + active_send_request_.reset();
|
| + send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
|
| DoSendRequestLoop(OK);
|
| break;
|
|
|
| @@ -462,9 +498,10 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
|
| case STREAM_READ_PENDING:
|
| usable_ = false;
|
| stream_info_map_[pipeline_id].state = STREAM_CLOSED;
|
| - stream_info_map_[pipeline_id].read_headers_callback = NULL;
|
| - if (read_next_state_ == READ_STATE_NONE) {
|
| - read_next_state_ = READ_STATE_UNUSABLE;
|
| + if (!request_order_.empty() &&
|
| + pipeline_id == request_order_.front() &&
|
| + read_next_state_ == READ_STATE_NONE) {
|
| + read_next_state_ = READ_STATE_EVICT_PENDING_READS;
|
| DoReadHeadersLoop(OK);
|
| }
|
| break;
|
| @@ -475,10 +512,13 @@ void HttpPipelinedConnectionImpl::Close(int pipeline_id,
|
| usable_ = false;
|
| }
|
| read_next_state_ = READ_STATE_STREAM_CLOSED;
|
| - read_user_callback_ = NULL;
|
| DoReadHeadersLoop(OK);
|
| break;
|
|
|
| + case STREAM_READ_EVICTED:
|
| + stream_info_map_[pipeline_id].state = STREAM_CLOSED;
|
| + break;
|
| +
|
| case STREAM_CLOSED:
|
| case STREAM_UNUSED:
|
| // TODO(simonjam): Why is Close() sometimes called twice?
|
| @@ -496,8 +536,7 @@ int HttpPipelinedConnectionImpl::ReadResponseBody(
|
| int buf_len,
|
| OldCompletionCallback* callback) {
|
| CHECK(ContainsKey(stream_info_map_, pipeline_id));
|
| - CHECK(!request_order_.empty());
|
| - CHECK_EQ(pipeline_id, request_order_.front());
|
| + CHECK_EQ(active_read_id_, pipeline_id);
|
| CHECK(stream_info_map_[pipeline_id].parser.get());
|
| return stream_info_map_[pipeline_id].parser->ReadResponseBody(
|
| buf, buf_len, callback);
|
| @@ -567,10 +606,29 @@ void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
|
| cert_request_info);
|
| }
|
|
|
| +void HttpPipelinedConnectionImpl::QueueUserCallback(
|
| + int pipeline_id,
|
| + OldCompletionCallback* callback,
|
| + int rv,
|
| + const tracked_objects::Location& from_here) {
|
| + CHECK(!stream_info_map_[pipeline_id].pending_user_callback);
|
| + stream_info_map_[pipeline_id].pending_user_callback = callback;
|
| + MessageLoop::current()->PostTask(
|
| + from_here,
|
| + method_factory_.NewRunnableMethod(
|
| + &HttpPipelinedConnectionImpl::FireUserCallback,
|
| + pipeline_id,
|
| + rv));
|
| +}
|
| +
|
| void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
|
| int result) {
|
| if (ContainsKey(stream_info_map_, pipeline_id)) {
|
| - stream_info_map_[pipeline_id].pending_user_callback->Run(result);
|
| + CHECK(stream_info_map_[pipeline_id].pending_user_callback);
|
| + OldCompletionCallback* callback =
|
| + stream_info_map_[pipeline_id].pending_user_callback;
|
| + stream_info_map_[pipeline_id].pending_user_callback = NULL;
|
| + callback->Run(result);
|
| }
|
| }
|
|
|
| @@ -602,14 +660,15 @@ bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
|
| return was_npn_negotiated_;
|
| }
|
|
|
| -HttpPipelinedConnectionImpl::DeferredSendRequest::DeferredSendRequest() {
|
| +HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() {
|
| }
|
|
|
| -HttpPipelinedConnectionImpl::DeferredSendRequest::~DeferredSendRequest() {
|
| +HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
|
| }
|
|
|
| HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
|
| : read_headers_callback(NULL),
|
| + pending_user_callback(NULL),
|
| state(STREAM_CREATED) {
|
| }
|
|
|
|
|