OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/bidirectional_spdy_stream.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/memory/scoped_ptr.h" | |
9 #include "base/time/time.h" | |
10 #include "net/base/request_priority.h" | |
11 #include "net/spdy/spdy_buffer.h" | |
12 #include "net/spdy/spdy_header_block.h" | |
13 #include "net/spdy/spdy_http_utils.h" | |
14 #include "net/spdy/spdy_stream.h" | |
15 | |
16 namespace net { | |
17 | |
18 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
19 | |
20 BidirectionalSpdyStream::BidirectionalSpdyStream( | |
21 const base::WeakPtr<SpdySession>& spdy_session) | |
22 : spdy_session_(spdy_session), | |
23 stream_closed_(false), | |
24 closed_stream_status_(ERR_FAILED), | |
25 buffered_read_callback_pending_(false), | |
26 more_read_data_pending_(false), | |
27 weak_factory_(this) {} | |
28 | |
29 BidirectionalSpdyStream::~BidirectionalSpdyStream() { | |
30 if (stream_.get()) { | |
31 stream_->DetachDelegate(); | |
32 DCHECK(!stream_.get()); | |
33 } | |
34 } | |
35 | |
36 void BidirectionalSpdyStream::Start(const HttpRequestInfo* request_info, | |
37 RequestPriority priority, | |
38 const BoundNetLog& net_log, | |
39 BidirectionalStream::Delegate* delegate) { | |
40 delegate_ = delegate; | |
41 DCHECK(!stream_); | |
42 if (!spdy_session_) | |
43 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
44 | |
45 request_info_ = request_info; | |
46 | |
47 int rv = stream_request_.StartRequest( | |
48 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority, | |
49 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized, | |
50 weak_factory_.GetWeakPtr())); | |
51 if (rv != ERR_IO_PENDING) | |
52 OnStreamInitialized(rv); | |
53 } | |
54 | |
55 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) { | |
56 if (stream_.get()) | |
57 CHECK(!stream_->IsIdle()); | |
58 | |
59 CHECK(buf); | |
60 CHECK(buf_len); | |
61 if (!stream_closed_) | |
62 CHECK(stream_); | |
63 | |
64 // If there is data buffered, complete the IO immediately. | |
65 if (!data_queue_.IsEmpty()) { | |
66 return data_queue_.Dequeue(buf->data(), buf_len); | |
67 } else if (stream_closed_) { | |
68 return closed_stream_status_; | |
69 } | |
70 user_buffer_ = buf; | |
71 user_buffer_len_ = buf_len; | |
72 return ERR_IO_PENDING; | |
73 } | |
74 | |
75 void BidirectionalSpdyStream::SendData(IOBuffer* data, | |
76 int length, | |
77 bool end_stream) { | |
78 stream_->SendData(data, length, | |
79 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
80 } | |
81 | |
82 void BidirectionalSpdyStream::Cancel() { | |
83 if (!stream_) | |
84 return; | |
85 stream_->Cancel(); | |
86 DCHECK(!stream_); | |
87 } | |
88 | |
89 void BidirectionalSpdyStream::OnRequestHeadersSent() { | |
90 delegate_->OnRequestHeadersSent(); | |
91 } | |
92 | |
93 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated( | |
94 const SpdyHeaderBlock& response_headers) { | |
95 delegate_->OnHeaders(response_headers); | |
96 return RESPONSE_HEADERS_ARE_COMPLETE; | |
97 } | |
98 | |
99 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
100 DCHECK(stream_); | |
101 data_queue_.Enqueue(buffer.Pass()); | |
102 if (user_buffer_) { | |
103 // Handing small chunks of data to the caller creates measurable overhead. | |
104 // So buffer data in short time-spans and send a single read notification. | |
105 ScheduleBufferedRead(); | |
106 } | |
107 } | |
108 | |
109 void BidirectionalSpdyStream::OnDataSent() { | |
110 delegate_->OnDataSent(); | |
111 } | |
112 | |
113 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) { | |
114 delegate_->OnTrailers(trailers); | |
115 } | |
116 | |
117 void BidirectionalSpdyStream::OnClose(int status) { | |
118 DCHECK(stream_); | |
119 | |
120 stream_closed_ = true; | |
121 closed_stream_status_ = status; | |
122 stream_.reset(); | |
123 | |
124 // Complete remaining buffered read. | |
125 if (status == OK && user_buffer_) { | |
126 DoBufferedRead(); | |
127 return; | |
128 } | |
129 | |
130 delegate_->OnClose(status); | |
131 } | |
132 | |
133 void BidirectionalSpdyStream::SendRequestHeaders() { | |
134 stream_->SetDelegate(this); | |
135 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
136 CreateSpdyHeadersFromHttpRequest(*request_info_, request_info_->extra_headers, | |
137 stream_->GetProtocolVersion(), true, | |
138 headers.get()); | |
139 bool end_stream = (request_info_->method == "GET"); | |
140 stream_->SendRequestHeaders( | |
141 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
142 } | |
143 | |
144 void BidirectionalSpdyStream::OnStreamInitialized(int rv) { | |
145 DCHECK_NE(ERR_IO_PENDING, rv); | |
146 if (rv == OK) { | |
147 stream_ = stream_request_.ReleaseStream(); | |
148 SendRequestHeaders(); | |
149 return; | |
150 } | |
151 delegate_->OnFailed(rv); | |
152 } | |
153 | |
154 void BidirectionalSpdyStream::ScheduleBufferedRead() { | |
155 // If there is already a scheduled DoBufferedRead, don't issue | |
156 // another one. Mark that we have received more data and return. | |
157 if (buffered_read_callback_pending_) { | |
158 more_read_data_pending_ = true; | |
159 return; | |
160 } | |
161 | |
162 more_read_data_pending_ = false; | |
163 buffered_read_callback_pending_ = true; | |
164 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
165 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedRead, | |
166 weak_factory_.GetWeakPtr()), | |
167 kBufferTime); | |
168 } | |
169 | |
170 void BidirectionalSpdyStream::DoBufferedRead() { | |
171 buffered_read_callback_pending_ = false; | |
172 // If the stream errored out, do not complete the read. | |
173 if (!stream_ && !stream_closed_) | |
174 return; | |
175 if (stream_closed_ && closed_stream_status_ != OK) | |
176 return; | |
177 | |
178 // When |more_read_data_pending_| is true, it means that more data has arrived | |
179 // since started waiting. Wait a little longer and continue to buffer. | |
180 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
181 ScheduleBufferedRead(); | |
182 return; | |
183 } | |
184 | |
185 int rv = 0; | |
186 if (user_buffer_.get()) { | |
187 rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
188 DCHECK_NE(ERR_IO_PENDING, rv); | |
189 user_buffer_ = nullptr; | |
190 user_buffer_len_ = 0; | |
191 delegate_->OnReadCompleted(rv); | |
192 | |
193 // If all data is read, and BidirectionalSpdyStream::onClose is invoked | |
194 // previously, let the delegate know about the onClose event. | |
195 if (data_queue_.IsEmpty() && stream_closed_) { | |
196 DCHECK_EQ(OK, closed_stream_status_); | |
197 delegate_->OnClose(OK); | |
mef
2015/10/20 21:56:35
should we change this to 'noMoreData' flag?
xunjieli
2015/10/21 19:35:36
Talked on the Misha's CL. I misunderstood the comm
| |
198 } | |
199 } | |
200 } | |
201 | |
202 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const { | |
203 if (stream_closed_) | |
204 return false; | |
205 DCHECK_GT(user_buffer_len_, 0); | |
206 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
207 } | |
208 | |
209 } // namespace net | |
OLD | NEW |