Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(183)

Side by Side Diff: net/spdy/spdy_stream.cc

Issue 13009012: [SPDY] Refactor SpdySession's write queue (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix gyp error Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_stream.h" 5 #include "net/spdy/spdy_stream.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/compiler_specific.h" 8 #include "base/compiler_specific.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/message_loop.h" 10 #include "base/message_loop.h"
11 #include "base/stringprintf.h" 11 #include "base/stringprintf.h"
12 #include "base/values.h" 12 #include "base/values.h"
13 #include "net/spdy/spdy_http_utils.h" 13 #include "net/spdy/spdy_http_utils.h"
14 #include "net/spdy/spdy_session.h" 14 #include "net/spdy/spdy_session.h"
15 #include "net/spdy/spdy_write_queue.h"
15 16
16 namespace net { 17 namespace net {
17 18
18 namespace { 19 namespace {
19 20
20 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 21 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
21 int status, 22 int status,
22 const std::string* description, 23 const std::string* description,
23 NetLog::LogLevel /* log_level */) { 24 NetLog::LogLevel /* log_level */) {
24 DictionaryValue* dict = new DictionaryValue(); 25 DictionaryValue* dict = new DictionaryValue();
(...skipping 18 matching lines...) Expand all
43 for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) { 44 for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
44 if (*i >= 'A' && *i <= 'Z') { 45 if (*i >= 'A' && *i <= 'Z') {
45 return true; 46 return true;
46 } 47 }
47 } 48 }
48 return false; 49 return false;
49 } 50 }
50 51
51 } // namespace 52 } // namespace
52 53
54 // A wrapper around a stream that calls into ProduceSynStreamFrame().
55 class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer {
56 public:
57 SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream)
58 : stream_(stream) {
59 DCHECK(stream_);
60 }
61
62 virtual ~SynStreamFrameProducer() {}
63
64 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
65 if (!stream_) {
66 NOTREACHED();
67 return scoped_ptr<SpdyFrame>();
68 }
69 DCHECK_GT(stream_->stream_id(), 0u);
70 return stream_->ProduceSynStreamFrame();
71 }
72
73 private:
74 const base::WeakPtr<SpdyStream> stream_;
75 };
76
77 // A wrapper around a stream that calls into ProduceHeaderFrame() with
78 // a given header block.
79 class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer {
80 public:
81 HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream,
82 scoped_ptr<SpdyHeaderBlock> headers)
83 : stream_(stream),
84 headers_(headers.Pass()) {
85 DCHECK(stream_);
86 DCHECK(headers_);
87 }
88
89 virtual ~HeaderFrameProducer() {}
90
91 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
92 if (!stream_) {
93 NOTREACHED();
94 return scoped_ptr<SpdyFrame>();
95 }
96 DCHECK_GT(stream_->stream_id(), 0u);
97 return stream_->ProduceHeaderFrame(headers_.Pass());
98 }
99
100 private:
101 const base::WeakPtr<SpdyStream> stream_;
102 scoped_ptr<SpdyHeaderBlock> headers_;
103 };
104
53 SpdyStream::SpdyStream(SpdySession* session, 105 SpdyStream::SpdyStream(SpdySession* session,
54 const std::string& path, 106 const std::string& path,
55 RequestPriority priority, 107 RequestPriority priority,
56 int32 initial_send_window_size, 108 int32 initial_send_window_size,
57 int32 initial_recv_window_size, 109 int32 initial_recv_window_size,
58 bool pushed, 110 bool pushed,
59 const BoundNetLog& net_log) 111 const BoundNetLog& net_log)
60 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 112 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
61 continue_buffering_data_(true), 113 continue_buffering_data_(true),
62 stream_id_(0), 114 stream_id_(0),
(...skipping 13 matching lines...) Expand all
76 io_state_(STATE_NONE), 128 io_state_(STATE_NONE),
77 response_status_(OK), 129 response_status_(OK),
78 cancelled_(false), 130 cancelled_(false),
79 has_upload_data_(false), 131 has_upload_data_(false),
80 net_log_(net_log), 132 net_log_(net_log),
81 send_bytes_(0), 133 send_bytes_(0),
82 recv_bytes_(0), 134 recv_bytes_(0),
83 domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { 135 domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) {
84 } 136 }
85 137
86 class SpdyStream::SpdyStreamIOBufferProducer
87 : public SpdySession::SpdyIOBufferProducer {
88 public:
89 SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {}
90
91 // SpdyFrameProducer
92 virtual RequestPriority GetPriority() const OVERRIDE {
93 return stream_->priority();
94 }
95
96 virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
97 if (stream_->cancelled())
98 return NULL;
99 if (stream_->stream_id() == 0)
100 SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_);
101 frame_ = stream_->ProduceNextFrame();
102 return frame_ == NULL ? NULL :
103 SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
104 frame_.get(), GetPriority(), stream_);
105 }
106
107 private:
108 scoped_refptr<SpdyStream> stream_;
109 scoped_ptr<SpdyFrame> frame_;
110 };
111
112 void SpdyStream::SetHasWriteAvailable() {
113 session_->SetStreamHasWriteAvailable(this,
114 new SpdyStreamIOBufferProducer(this));
115 }
116
117 scoped_ptr<SpdyFrame> SpdyStream::ProduceNextFrame() {
118 if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) {
119 CHECK(request_.get());
120 CHECK_GT(stream_id_, 0u);
121
122 std::string origin = GetUrl().GetOrigin().spec();
123 DCHECK(origin[origin.length() - 1] == '/');
124 origin.erase(origin.length() - 1); // Trim trailing slash.
125 scoped_ptr<SpdyFrame> frame(session_->CreateCredentialFrame(
126 origin, domain_bound_cert_type_, domain_bound_private_key_,
127 domain_bound_cert_, priority_));
128 return frame.Pass();
129 } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) {
130 CHECK(request_.get());
131 CHECK_GT(stream_id_, 0u);
132
133 SpdyControlFlags flags =
134 has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN;
135 scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
136 stream_id_, priority_, slot_, flags, *request_));
137 send_time_ = base::TimeTicks::Now();
138 return frame.Pass();
139 } else {
140 CHECK(!cancelled());
141 // We must need to write stream data.
142 // Until the headers have been completely sent, we can not be sure
143 // that our stream_id is correct.
144 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
145 DCHECK_GT(stream_id_, 0u);
146 DCHECK(!pending_frames_.empty());
147
148 PendingFrame frame = pending_frames_.front();
149 pending_frames_.pop_front();
150
151 waiting_completions_.push_back(frame.type);
152
153 if (frame.type == TYPE_DATA) {
154 // Send queued data frame.
155 return scoped_ptr<SpdyFrame>(frame.data_frame);
156 } else {
157 DCHECK(frame.type == TYPE_HEADERS);
158 // Create actual HEADERS frame just in time because it depends on
159 // compression context and should not be reordered after the creation.
160 scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame(
161 stream_id_, *frame.header_block, SpdyControlFlags()));
162 delete frame.header_block;
163 return header_frame.Pass();
164 }
165 }
166 NOTREACHED();
167 }
168
169 SpdyStream::~SpdyStream() { 138 SpdyStream::~SpdyStream() {
170 UpdateHistograms(); 139 UpdateHistograms();
171 while (!pending_frames_.empty()) {
172 PendingFrame frame = pending_frames_.back();
173 pending_frames_.pop_back();
174 if (frame.type == TYPE_DATA)
175 delete frame.data_frame;
176 else
177 delete frame.header_block;
178 }
179 } 140 }
180 141
181 void SpdyStream::SetDelegate(Delegate* delegate) { 142 void SpdyStream::SetDelegate(Delegate* delegate) {
182 CHECK(delegate); 143 CHECK(delegate);
183 delegate_ = delegate; 144 delegate_ = delegate;
184 145
185 if (pushed_) { 146 if (pushed_) {
186 CHECK(response_received()); 147 CHECK(response_received());
187 MessageLoop::current()->PostTask( 148 MessageLoop::current()->PostTask(
188 FROM_HERE, base::Bind(&SpdyStream::PushedStreamReplayData, this)); 149 FROM_HERE, base::Bind(&SpdyStream::PushedStreamReplayData, this));
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); 182 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
222 } else { 183 } else {
223 delegate_->OnDataReceived(NULL, 0); 184 delegate_->OnDataReceived(NULL, 0);
224 session_->CloseStream(stream_id_, net::OK); 185 session_->CloseStream(stream_id_, net::OK);
225 // Note: |this| may be deleted after calling CloseStream. 186 // Note: |this| may be deleted after calling CloseStream.
226 DCHECK_EQ(buffers.size() - 1, i); 187 DCHECK_EQ(buffers.size() - 1, i);
227 } 188 }
228 } 189 }
229 } 190 }
230 191
192 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
193 CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE);
194 CHECK(request_.get());
195 CHECK_GT(stream_id_, 0u);
196
197 SpdyControlFlags flags =
198 has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN;
199 scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
200 stream_id_, priority_, slot_, flags, *request_));
201 send_time_ = base::TimeTicks::Now();
202 return frame.Pass();
203 }
204
205 scoped_ptr<SpdyFrame> SpdyStream::ProduceHeaderFrame(
206 scoped_ptr<SpdyHeaderBlock> header_block) {
207 CHECK(!cancelled());
208 // We must need to write stream data.
209 // Until the headers have been completely sent, we can not be sure
210 // that our stream_id is correct.
211 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
212 DCHECK_GT(stream_id_, 0u);
213
214 // Create actual HEADERS frame just in time because it depends on
215 // compression context and should not be reordered after the creation.
216 scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame(
217 stream_id_, *header_block, SpdyControlFlags()));
218 return header_frame.Pass();
219 }
220
231 void SpdyStream::DetachDelegate() { 221 void SpdyStream::DetachDelegate() {
232 delegate_ = NULL; 222 delegate_ = NULL;
233 if (!closed()) 223 if (!closed())
234 Cancel(); 224 Cancel();
235 } 225 }
236 226
237 const SpdyHeaderBlock& SpdyStream::spdy_headers() const { 227 const SpdyHeaderBlock& SpdyStream::spdy_headers() const {
238 DCHECK(request_ != NULL); 228 DCHECK(request_ != NULL);
239 return *request_.get(); 229 return *request_.get();
240 } 230 }
(...skipping 330 matching lines...) Expand 10 before | Expand all | Expand 10 after
571 io_state_ = STATE_GET_DOMAIN_BOUND_CERT; 561 io_state_ = STATE_GET_DOMAIN_BOUND_CERT;
572 return DoLoop(OK); 562 return DoLoop(OK);
573 } 563 }
574 564
575 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { 565 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
576 // Until the first headers by SYN_STREAM have been completely sent, we can 566 // Until the first headers by SYN_STREAM have been completely sent, we can
577 // not be sure that our stream_id is correct. 567 // not be sure that our stream_id is correct.
578 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 568 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
579 CHECK_GT(stream_id_, 0u); 569 CHECK_GT(stream_id_, 0u);
580 570
581 PendingFrame frame; 571 waiting_completions_.push_back(TYPE_HEADERS);
582 frame.type = TYPE_HEADERS;
583 // |frame.header_block| is deleted by either ProduceNextFrame() or
584 // the destructor.
585 frame.header_block = headers.release();
586 pending_frames_.push_back(frame);
587 572
588 SetHasWriteAvailable(); 573 session_->EnqueueStreamWrite(
574 this,
575 scoped_ptr<SpdyFrameProducer>(
576 new HeaderFrameProducer(
577 weak_ptr_factory_.GetWeakPtr(), headers.Pass())));
589 } 578 }
590 579
591 void SpdyStream::QueueStreamData(IOBuffer* data, 580 void SpdyStream::QueueStreamData(IOBuffer* data,
592 int length, 581 int length,
593 SpdyDataFlags flags) { 582 SpdyDataFlags flags) {
594 // Until the headers have been completely sent, we can not be sure 583 // Until the headers have been completely sent, we can not be sure
595 // that our stream_id is correct. 584 // that our stream_id is correct.
596 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 585 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
597 CHECK_GT(stream_id_, 0u); 586 CHECK_GT(stream_id_, 0u);
587 CHECK(!cancelled());
598 588
599 scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( 589 scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame(
600 stream_id_, data, length, flags)); 590 stream_id_, data, length, flags));
601 if (!data_frame) 591 if (!data_frame)
602 return; 592 return;
603 593
604 PendingFrame frame; 594 waiting_completions_.push_back(TYPE_DATA);
605 frame.type = TYPE_DATA;
606 // |frame.data_frame| is either returned by ProduceNextFrame() or
607 // deleted in the destructor.
608 frame.data_frame = data_frame.release();
609 pending_frames_.push_back(frame);
610 595
611 SetHasWriteAvailable(); 596 session_->EnqueueStreamWrite(
597 this,
598 scoped_ptr<SpdyFrameProducer>(
599 new SimpleFrameProducer(data_frame.Pass())));
612 } 600 }
613 601
614 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 602 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
615 bool* was_npn_negotiated, 603 bool* was_npn_negotiated,
616 NextProto* protocol_negotiated) { 604 NextProto* protocol_negotiated) {
617 return session_->GetSSLInfo( 605 return session_->GetSSLInfo(
618 ssl_info, was_npn_negotiated, protocol_negotiated); 606 ssl_info, was_npn_negotiated, protocol_negotiated);
619 } 607 }
620 608
621 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 609 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
686 result = DoSendBodyComplete(result); 674 result = DoSendBodyComplete(result);
687 break; 675 break;
688 // This is an intermediary waiting state. This state is reached when all 676 // This is an intermediary waiting state. This state is reached when all
689 // data has been sent, but no data has been received. 677 // data has been sent, but no data has been received.
690 case STATE_WAITING_FOR_RESPONSE: 678 case STATE_WAITING_FOR_RESPONSE:
691 io_state_ = STATE_WAITING_FOR_RESPONSE; 679 io_state_ = STATE_WAITING_FOR_RESPONSE;
692 result = ERR_IO_PENDING; 680 result = ERR_IO_PENDING;
693 break; 681 break;
694 // State machine 2: connection is established. 682 // State machine 2: connection is established.
695 // In STATE_OPEN, OnResponseReceived has already been called. 683 // In STATE_OPEN, OnResponseReceived has already been called.
696 // OnDataReceived, OnClose and OnWriteCompelte can be called. 684 // OnDataReceived, OnClose and OnWriteComplete can be called.
697 // Only OnWriteComplete calls DoLoop((). 685 // Only OnWriteComplete calls DoLoop(().
698 // 686 //
699 // For HTTP streams, no data is sent from the client while in the OPEN 687 // For HTTP streams, no data is sent from the client while in the OPEN
700 // state, so OnWriteComplete is never called here. The HTTP body is 688 // state, so OnWriteComplete is never called here. The HTTP body is
701 // handled in the OnDataReceived callback, which does not call into 689 // handled in the OnDataReceived callback, which does not call into
702 // DoLoop. 690 // DoLoop.
703 // 691 //
704 // For WebSocket streams, which are bi-directional, we'll send and 692 // For WebSocket streams, which are bi-directional, we'll send and
705 // receive data once the connection is established. Received data is 693 // receive data once the connection is established. Received data is
706 // handled in OnDataReceived. Sent data is handled in OnWriteComplete, 694 // handled in OnDataReceived. Sent data is handled in OnWriteComplete,
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
756 return result; 744 return result;
757 745
758 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT; 746 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT;
759 slot_ = session_->credential_state()->SetHasCredential(GetUrl()); 747 slot_ = session_->credential_state()->SetHasCredential(GetUrl());
760 return OK; 748 return OK;
761 } 749 }
762 750
763 int SpdyStream::DoSendDomainBoundCert() { 751 int SpdyStream::DoSendDomainBoundCert() {
764 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; 752 io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
765 CHECK(request_.get()); 753 CHECK(request_.get());
766 SetHasWriteAvailable(); 754
755 std::string origin = GetUrl().GetOrigin().spec();
756 DCHECK(origin[origin.length() - 1] == '/');
757 origin.erase(origin.length() - 1); // Trim trailing slash.
758 scoped_ptr<SpdyFrame> frame;
759 int rv = session_->CreateCredentialFrame(
760 origin, domain_bound_cert_type_, domain_bound_private_key_,
761 domain_bound_cert_, priority_, &frame);
762 if (rv != OK) {
763 DCHECK_NE(rv, ERR_IO_PENDING);
764 return rv;
765 }
766
767 DCHECK(frame);
768 // TODO(akalin): Fix a couple of race conditions:
769 //
770 // 1) Since this counts as a write for this stream, the stream will
771 // be activated (and hence allocated a stream ID) before this frame
772 // is sent, even though the ID should only be activated for the
773 // SYN_STREAM frame. This can be solved by signalling to the session
774 // when we're sending a SYN_STREAM frame, and have it only activate
775 // the stream then.
776 //
777 // 2) Since this is decoupled from sending the SYN_STREAM frame, it
778 // is possible that other domain-bound cert frames will clobber ours
779 // before our SYN_STREAM frame gets sent. This can be solved by
780 // immediately enqueueing the SYN_STREAM frame here and adjusting
781 // the state machine appropriately.
782 session_->EnqueueStreamWrite(
783 this,
784 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())));
767 return ERR_IO_PENDING; 785 return ERR_IO_PENDING;
768 } 786 }
769 787
770 int SpdyStream::DoSendDomainBoundCertComplete(int result) { 788 int SpdyStream::DoSendDomainBoundCertComplete(int result) {
771 if (result < 0) 789 if (result < 0)
772 return result; 790 return result;
773 791
774 io_state_ = STATE_SEND_HEADERS; 792 io_state_ = STATE_SEND_HEADERS;
775 return OK; 793 return OK;
776 } 794 }
777 795
778 int SpdyStream::DoSendHeaders() { 796 int SpdyStream::DoSendHeaders() {
779 CHECK(!cancelled_); 797 CHECK(!cancelled_);
798 io_state_ = STATE_SEND_HEADERS_COMPLETE;
780 799
781 SetHasWriteAvailable(); 800 session_->EnqueueStreamWrite(
782 io_state_ = STATE_SEND_HEADERS_COMPLETE; 801 this,
802 scoped_ptr<SpdyFrameProducer>(
803 new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr())));
783 return ERR_IO_PENDING; 804 return ERR_IO_PENDING;
784 } 805 }
785 806
786 int SpdyStream::DoSendHeadersComplete(int result) { 807 int SpdyStream::DoSendHeadersComplete(int result) {
787 if (result < 0) 808 if (result < 0)
788 return result; 809 return result;
789 810
790 CHECK_GT(result, 0); 811 CHECK_GT(result, 0);
791 812
792 if (!delegate_) 813 if (!delegate_)
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
883 } 904 }
884 905
885 recv_window_size_ -= delta_window_size; 906 recv_window_size_ -= delta_window_size;
886 net_log_.AddEvent( 907 net_log_.AddEvent(
887 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 908 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
888 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 909 base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
889 stream_id_, -delta_window_size, recv_window_size_)); 910 stream_id_, -delta_window_size, recv_window_size_));
890 } 911 }
891 912
892 } // namespace net 913 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698