Chromium Code Reviews| Index: net/http/http_pipelined_connection_impl.cc |
| diff --git a/net/http/http_pipelined_connection_impl.cc b/net/http/http_pipelined_connection_impl.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..14c7a754525246c328a5b0e52a6c157d31e2191e |
| --- /dev/null |
| +++ b/net/http/http_pipelined_connection_impl.cc |
| @@ -0,0 +1,516 @@ |
| +// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "net/http/http_pipelined_connection_impl.h" |
| + |
| +#include "base/message_loop.h" |
| +#include "base/stl_util.h" |
| +#include "net/base/io_buffer.h" |
| +#include "net/http/http_pipelined_host.h" |
|
mmenke
2011/08/23 19:05:25
nit: No longer needed.
James Simonsen
2011/08/26 22:19:07
Done.
|
| +#include "net/http/http_pipelined_stream.h" |
| +#include "net/http/http_request_info.h" |
| +#include "net/http/http_stream_parser.h" |
| +#include "net/socket/client_socket_handle.h" |
| + |
| +namespace net { |
| + |
| +HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl( |
| + ClientSocketHandle* connection, |
| + HttpPipelinedConnection::Owner* owner, |
| + const SSLConfig& used_ssl_config, |
| + const ProxyInfo& used_proxy_info, |
| + const BoundNetLog& net_log, |
| + bool was_npn_negotiated) |
| + : owner_(owner), |
| + connection_(connection), |
| + used_ssl_config_(used_ssl_config), |
| + used_proxy_info_(used_proxy_info), |
| + net_log_(net_log), |
| + was_npn_negotiated_(was_npn_negotiated), |
| + read_buf_(new GrowableIOBuffer()), |
| + next_pipeline_id_(1), |
| + active_(false), |
| + usable_(true), |
| + ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), |
| + send_next_state_(SEND_STATE_NONE), |
| + ALLOW_THIS_IN_INITIALIZER_LIST(send_io_callback_( |
| + this, &HttpPipelinedConnectionImpl::OnSendIOCallback)), |
| + send_user_callback_(NULL), |
| + read_next_state_(READ_STATE_NONE), |
| + ALLOW_THIS_IN_INITIALIZER_LIST(read_io_callback_( |
| + this, &HttpPipelinedConnectionImpl::OnReadIOCallback)), |
| + read_user_callback_(NULL) { |
| + DCHECK(connection_.get()); |
| +} |
| + |
| +HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { |
| + DCHECK(depth() == 0); |
| + DCHECK(parser_map_.empty()); |
| + DCHECK(callback_map_.empty()); |
| + DCHECK(deferred_request_queue_.empty()); |
| + DCHECK(request_order_.empty()); |
| + DCHECK_EQ(send_next_state_, SEND_STATE_NONE); |
| + DCHECK_EQ(read_next_state_, READ_STATE_NONE); |
| + DCHECK(!send_user_callback_); |
| + DCHECK(!read_user_callback_); |
| + if (!usable_) { |
| + connection_->socket()->Disconnect(); |
| + } |
| + connection_->Reset(); |
| +} |
| + |
| +HttpStream* HttpPipelinedConnectionImpl::CreateNewStream() { |
| + int pipeline_id = next_pipeline_id_++; |
| + DCHECK(pipeline_id); |
| + HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); |
| + stream_state_map_.insert(std::make_pair(pipeline_id, STREAM_CREATED)); |
| + return stream; |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::InitializeParser( |
| + int pipeline_id, |
| + const HttpRequestInfo* request, |
| + const BoundNetLog& net_log) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(!ContainsKey(parser_map_, pipeline_id)); |
| + stream_state_map_[pipeline_id] = STREAM_BOUND; |
| + HttpStreamParser* parser = new HttpStreamParser(connection_.get(), |
| + request, |
| + read_buf_.get(), |
| + net_log); |
| + parser_map_.insert(std::make_pair(pipeline_id, parser)); |
| + if (!active_) { |
| + active_ = true; |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::FillPipeline)); |
| + } |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::FillPipeline() { |
| + owner_->OnPipelineHasCapacity(this); |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + |
| + if (stream_state_map_[pipeline_id] != STREAM_CREATED && |
| + stream_state_map_[pipeline_id] != STREAM_UNUSED) { |
| + DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_CLOSED); |
|
mmenke
2011/08/23 19:05:25
A BOUND_STREAM can currently end up here on ERR_PI
James Simonsen
2011/08/26 22:19:07
Good catch!
Hmm. I'd actually prefer that Close()
|
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + DCHECK(!ContainsKey(callback_map_, pipeline_id)); |
| + |
| + ParserMap::iterator it = parser_map_.find(pipeline_id); |
| + delete it->second; |
| + parser_map_.erase(it); |
| + } |
| + stream_state_map_.erase(pipeline_id); |
| + |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::FillPipeline)); |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::SendRequest(int pipeline_id, |
| + const std::string& request_line, |
| + const HttpRequestHeaders& headers, |
| + UploadDataStream* request_body, |
| + HttpResponseInfo* response, |
| + CompletionCallback* callback) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_BOUND); |
| + if (!usable_) { |
| + return ERR_PIPELINE_EVICTION; |
| + } |
| + |
| + DeferredSendRequest deferred_request; |
| + deferred_request.pipeline_id = pipeline_id; |
| + deferred_request.request_line = request_line; |
| + deferred_request.headers = headers; |
| + deferred_request.request_body = request_body; |
| + deferred_request.response = response; |
| + deferred_request.callback = callback; |
| + deferred_request_queue_.push(deferred_request); |
| + |
| + if (send_next_state_ == SEND_STATE_NONE) { |
| + send_next_state_ = SEND_STATE_NEXT_REQUEST; |
| + return DoSendRequestLoop(OK); |
| + } else { |
| + return ERR_IO_PENDING; |
| + } |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { |
| + int rv = result; |
| + do { |
| + SendRequestState state = send_next_state_; |
| + send_next_state_ = SEND_STATE_NONE; |
| + switch (state) { |
| + case SEND_STATE_NEXT_REQUEST: |
| + rv = DoSendNextRequest(rv); |
| + break; |
| + case SEND_STATE_COMPLETE: |
| + rv = DoSendComplete(rv); |
| + break; |
| + case SEND_STATE_UNUSABLE: |
| + rv = DoEvictPendingSendRequests(rv); |
| + break; |
| + default: |
| + NOTREACHED() << "bad send state: " << state; |
| + rv = ERR_FAILED; |
| + break; |
| + } |
| + } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); |
| + return rv; |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { |
| + DCHECK(send_user_callback_); |
| + DoSendRequestLoop(result); |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoSendNextRequest(int result) { |
| + DCHECK(!deferred_request_queue_.empty()); |
| + const DeferredSendRequest& deferred_request = deferred_request_queue_.front(); |
| + DCHECK(ContainsKey(stream_state_map_, deferred_request.pipeline_id)); |
| + if (stream_state_map_[deferred_request.pipeline_id] == STREAM_CLOSED) { |
| + deferred_request_queue_.pop(); |
| + if (deferred_request_queue_.empty()) { |
| + send_next_state_ = SEND_STATE_NONE; |
| + } else { |
| + send_next_state_ = SEND_STATE_NEXT_REQUEST; |
| + } |
| + return OK; |
| + } |
| + DCHECK(ContainsKey(parser_map_, deferred_request.pipeline_id)); |
| + int rv = parser_map_[deferred_request.pipeline_id]->SendRequest( |
| + deferred_request.request_line, |
| + deferred_request.headers, |
| + deferred_request.request_body, |
| + deferred_request.response, |
| + &send_io_callback_); |
| + if (result == ERR_IO_PENDING || rv == ERR_IO_PENDING) { |
|
mmenke
2011/08/23 19:05:25
nit: Add a comment that |result| == ERR_IO_PENDIN
James Simonsen
2011/08/26 22:19:07
Done.
|
| + send_user_callback_ = deferred_request.callback; |
| + } |
| + request_order_.push(deferred_request.pipeline_id); |
| + stream_state_map_[deferred_request.pipeline_id] = STREAM_SENT; |
| + deferred_request_queue_.pop(); |
| + send_next_state_ = SEND_STATE_COMPLETE; |
| + return rv; |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoSendComplete(int result) { |
| + if (send_user_callback_) { |
| + CompletionCallback* callback = send_user_callback_; |
| + send_user_callback_ = NULL; |
| + callback->Run(result); |
| + } |
| + if (result < OK) { |
|
mmenke
2011/08/23 19:15:25
On ERR_SOCKET_NOT_CONNECTED, we might want to swit
James Simonsen
2011/08/26 22:19:07
Yeah, good idea. Done. And tested.
|
| + send_next_state_ = SEND_STATE_UNUSABLE; |
| + usable_ = false; |
| + return result; |
| + } |
| + if (deferred_request_queue_.empty()) { |
| + send_next_state_ = SEND_STATE_NONE; |
| + return OK; |
| + } else { |
| + send_next_state_ = SEND_STATE_NEXT_REQUEST; |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::DoSendRequestLoop, |
| + ERR_IO_PENDING)); |
| + return ERR_IO_PENDING; // Wait for the task to fire. |
| + } |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { |
| + send_next_state_ = SEND_STATE_NONE; |
| + while (!deferred_request_queue_.empty()) { |
| + const DeferredSendRequest& evicted_send = deferred_request_queue_.front(); |
| + if (stream_state_map_[evicted_send.pipeline_id] != STREAM_CLOSED) { |
| + evicted_send.callback->Run(ERR_PIPELINE_EVICTION); |
| + } |
| + deferred_request_queue_.pop(); |
| + } |
| + return result; |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::ReadResponseHeaders( |
| + int pipeline_id, |
| + CompletionCallback* callback) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK_EQ(stream_state_map_[pipeline_id], STREAM_SENT); |
| + DCHECK(!ContainsKey(callback_map_, pipeline_id)); |
| + if (!usable_) { |
| + return ERR_PIPELINE_EVICTION; |
| + } |
| + callback_map_[pipeline_id] = callback; |
| + if (read_next_state_ == READ_STATE_NONE) { |
| + read_next_state_ = READ_STATE_NEXT_HEADERS; |
| + return DoReadHeadersLoop(OK); |
| + } else { |
| + return ERR_IO_PENDING; |
| + } |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { |
| + int rv = result; |
| + do { |
| + ReadHeadersState state = read_next_state_; |
| + read_next_state_ = READ_STATE_NONE; |
| + switch (state) { |
| + case READ_STATE_NEXT_HEADERS: |
| + rv = DoReadNextHeaders(rv); |
| + break; |
| + case READ_STATE_COMPLETE: |
| + rv = DoReadHeadersComplete(rv); |
| + break; |
| + case READ_STATE_WAITING_FOR_CLOSE: |
| + rv = DoReadWaitingForClose(rv); |
| + return rv; |
| + case READ_STATE_STREAM_CLOSED: |
| + rv = DoReadStreamClosed(); |
| + break; |
| + case READ_STATE_UNUSABLE: |
| + rv = DoEvictPendingReadHeaders(rv); |
| + break; |
| + default: |
| + NOTREACHED() << "bad read state"; |
| + rv = ERR_FAILED; |
| + break; |
| + } |
| + } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); |
| + return rv; |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { |
| + DCHECK(read_user_callback_ != NULL); |
| + DoReadHeadersLoop(result); |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoReadNextHeaders(int result) { |
| + DCHECK(!request_order_.empty()); |
| + int pipeline_id = request_order_.front(); |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + if (stream_state_map_[pipeline_id] == STREAM_CLOSED) { |
| + // Since nobody will read whatever data is on the pipeline associated with |
| + // this request, we must shut down the rest of the pipeline. |
| + read_next_state_ = READ_STATE_UNUSABLE; |
| + return OK; |
| + } |
| + CallbackMap::iterator it = callback_map_.find(pipeline_id); |
| + if (it == callback_map_.end()) { |
| + return ERR_IO_PENDING; |
| + } |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + int rv = parser_map_[pipeline_id]->ReadResponseHeaders( |
| + &read_io_callback_); |
| + if (rv == ERR_IO_PENDING) { |
| + read_next_state_ = READ_STATE_COMPLETE; |
| + read_user_callback_ = it->second; |
| + } else if (result == ERR_IO_PENDING) { |
| + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::FireUserCallback, |
| + it->second, |
| + rv)); |
|
mmenke
2011/08/23 19:05:25
Is there any guarantee that a stream won't be dele
James Simonsen
2011/08/26 22:19:07
Another good catch! Fixed and added tests.
|
| + } else if (rv < OK) { |
| + read_next_state_ = READ_STATE_UNUSABLE; |
| + } else { |
| + DCHECK_LE(OK, rv); |
| + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; |
| + } |
| + callback_map_.erase(it); |
| + return rv; |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { |
| + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; |
| + if (read_user_callback_) { |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::FireUserCallback, |
| + read_user_callback_, |
| + result)); |
| + read_user_callback_ = NULL; |
| + } |
| + return result; |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoReadWaitingForClose(int result) { |
| + read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; |
| + return result; |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoReadStreamClosed() { |
| + DCHECK(!request_order_.empty()); |
| + request_order_.pop(); |
| + if (!usable_) { |
| + read_next_state_ = READ_STATE_UNUSABLE; |
| + return OK; |
| + } else if (request_order_.empty() || |
| + !ContainsKey(callback_map_, request_order_.front())) { |
| + read_next_state_ = READ_STATE_NONE; |
| + return OK; |
| + } else { |
| + read_next_state_ = READ_STATE_NEXT_HEADERS; |
| + MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + method_factory_.NewRunnableMethod( |
| + &HttpPipelinedConnectionImpl::DoReadHeadersLoop, |
| + ERR_IO_PENDING)); |
| + return ERR_IO_PENDING; // Wait for the task to fire. |
| + } |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { |
| + while (!request_order_.empty()) { |
| + int evicted_id = request_order_.front(); |
| + request_order_.pop(); |
| + CallbackMap::iterator cb_it = callback_map_.find(evicted_id); |
| + if (cb_it == callback_map_.end()) { |
| + continue; |
| + } |
| + if (stream_state_map_[evicted_id] != STREAM_CLOSED) { |
| + cb_it->second->Run(ERR_PIPELINE_EVICTION); |
| + } |
| + callback_map_.erase(cb_it); |
| + } |
| + DCHECK(callback_map_.empty()); |
| + read_next_state_ = READ_STATE_NONE; |
| + return result; |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::Close(int pipeline_id, |
| + bool not_reusable) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + switch (stream_state_map_[pipeline_id]) { |
| + case STREAM_CLOSED: |
| + // TODO(simonjam): Why is Close() sometimes called twice? |
| + return; |
| + |
| + case STREAM_CREATED: |
| + stream_state_map_[pipeline_id] = STREAM_UNUSED; |
| + return; |
| + |
| + case STREAM_BOUND: |
| + stream_state_map_[pipeline_id] = STREAM_CLOSED; |
| + return; |
| + |
| + case STREAM_SENT: |
| + break; |
| + |
| + default: |
| + NOTREACHED(); |
| + break; |
| + } |
| + |
| + stream_state_map_[pipeline_id] = STREAM_CLOSED; |
| + bool is_active_stream = !request_order_.empty() && |
| + pipeline_id == request_order_.front(); |
| + if (not_reusable || !is_active_stream) { |
| + usable_ = false; |
| + } |
| + if (is_active_stream) { |
| + read_next_state_ = READ_STATE_STREAM_CLOSED; |
| + read_user_callback_ = NULL; |
| + DoReadHeadersLoop(OK); |
| + } else { |
| + if (send_next_state_ == SEND_STATE_NONE) { |
| + send_next_state_ = SEND_STATE_UNUSABLE; |
| + DoSendRequestLoop(OK); |
| + } |
| + if (read_next_state_ == READ_STATE_NONE) { |
| + read_next_state_ = READ_STATE_UNUSABLE; |
| + DoReadHeadersLoop(OK); |
| + } |
| + } |
| +} |
| + |
| +int HttpPipelinedConnectionImpl::ReadResponseBody( |
| + int pipeline_id, |
| + IOBuffer* buf, |
| + int buf_len, |
| + CompletionCallback* callback) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(!request_order_.empty()); |
| + DCHECK(pipeline_id == request_order_.front()); |
|
mmenke
2011/08/23 20:35:12
On destruction, an HttpNetworkTransactionuses a Ht
James Simonsen
2011/08/26 22:19:07
Nice. I liked Will's comment and moved it to the s
|
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_[pipeline_id]->ReadResponseBody(buf, buf_len, callback); |
| +} |
| + |
| +uint64 HttpPipelinedConnectionImpl::GetUploadProgress(int pipeline_id) const { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_.find(pipeline_id)->second->GetUploadProgress(); |
| +} |
| + |
| +HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo( |
| + int pipeline_id) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_[pipeline_id]->GetResponseInfo(); |
| +} |
| + |
| +bool HttpPipelinedConnectionImpl::IsResponseBodyComplete( |
| + int pipeline_id) const { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_.find(pipeline_id)->second->IsResponseBodyComplete(); |
| +} |
| + |
| +bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_.find(pipeline_id)->second->CanFindEndOfResponse(); |
| +} |
| + |
| +bool HttpPipelinedConnectionImpl::IsMoreDataBuffered(int pipeline_id) const { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + return read_buf_->offset(); |
| +} |
| + |
| +bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + if (pipeline_id > 1) { |
| + return true; |
| + } |
| + ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type(); |
| + return connection_->is_reused() || |
| + reuse_type == ClientSocketHandle::UNUSED_IDLE; |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + connection_->set_is_reused(true); |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id, |
| + SSLInfo* ssl_info) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_[pipeline_id]->GetSSLInfo(ssl_info); |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( |
| + int pipeline_id, |
| + SSLCertRequestInfo* cert_request_info) { |
| + DCHECK(ContainsKey(stream_state_map_, pipeline_id)); |
| + DCHECK(ContainsKey(parser_map_, pipeline_id)); |
| + return parser_map_[pipeline_id]->GetSSLCertRequestInfo(cert_request_info); |
| +} |
| + |
| +void HttpPipelinedConnectionImpl::FireUserCallback(CompletionCallback* callback, |
| + int result) { |
| + DCHECK(callback); |
| + callback->Run(result); |
| +} |
| + |
| +} // namespace net |