OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/bidirectional_stream_spdy_job.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/memory/scoped_ptr.h" | |
9 #include "base/time/time.h" | |
10 #include "base/timer/timer.h" | |
11 #include "net/base/request_priority.h" | |
12 #include "net/spdy/spdy_buffer.h" | |
13 #include "net/spdy/spdy_header_block.h" | |
14 #include "net/spdy/spdy_http_utils.h" | |
15 #include "net/spdy/spdy_stream.h" | |
16 | |
17 namespace net { | |
18 | |
19 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
20 | |
21 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob( | |
22 const base::WeakPtr<SpdySession>& spdy_session, | |
23 scoped_ptr<base::Timer> timer) | |
24 : spdy_session_(spdy_session), | |
25 timer_(timer.release()), | |
26 stream_closed_(false), | |
27 closed_stream_status_(ERR_FAILED), | |
28 more_read_data_pending_(false), | |
29 weak_factory_(this) {} | |
30 | |
31 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob( | |
32 const base::WeakPtr<SpdySession>& spdy_session) | |
33 : BidirectionalStreamSpdyJob( | |
34 spdy_session, | |
35 make_scoped_ptr(new base::Timer(false, false))) {} | |
36 | |
37 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() { | |
38 if (stream_.get()) { | |
39 stream_->DetachDelegate(); | |
40 DCHECK(!stream_.get()); | |
41 } | |
42 } | |
43 | |
44 void BidirectionalStreamSpdyJob::Start( | |
45 const HttpRequestInfo& request_info, | |
46 RequestPriority priority, | |
47 const BoundNetLog& net_log, | |
48 BidirectionalStreamJob::Delegate* delegate) { | |
49 delegate_ = delegate; | |
50 DCHECK(!stream_); | |
51 if (!spdy_session_) { | |
52 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
53 return; | |
54 } | |
55 | |
56 request_info_ = request_info; | |
57 | |
58 int rv = stream_request_.StartRequest( | |
59 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority, | |
60 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized, | |
61 weak_factory_.GetWeakPtr())); | |
62 if (rv != ERR_IO_PENDING) | |
63 OnStreamInitialized(rv); | |
64 } | |
65 | |
66 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) { | |
67 if (stream_.get()) | |
68 CHECK(!stream_->IsIdle()); | |
69 | |
70 CHECK(buf); | |
71 CHECK(buf_len); | |
72 CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight"; | |
73 | |
74 if (!stream_closed_) | |
75 CHECK(stream_); | |
76 | |
77 // If there is data buffered, complete the IO immediately. | |
78 if (!data_queue_.IsEmpty()) { | |
79 return data_queue_.Dequeue(buf->data(), buf_len); | |
80 } else if (stream_closed_) { | |
81 return closed_stream_status_; | |
mef
2015/12/07 21:18:07
Shouldn't it return 0?
The comment says that it r
xunjieli
2015/12/08 16:08:46
It should return a non-negative number that indica
| |
82 } | |
83 // Read will complete asynchronously and Delegate::OnReadCompleted will be | |
84 // called upon completion. | |
85 user_buffer_ = buf; | |
86 user_buffer_len_ = buf_len; | |
87 return ERR_IO_PENDING; | |
88 } | |
89 | |
90 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data, | |
91 int length, | |
92 bool end_stream) { | |
93 CHECK(!stream_closed_); | |
94 CHECK(stream_); | |
95 | |
96 stream_->SendData(data, length, | |
97 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
98 } | |
99 | |
100 void BidirectionalStreamSpdyJob::Cancel() { | |
101 if (!stream_) | |
102 return; | |
103 // Cancels the stream and detaches the delegate so it doesn't get called back. | |
104 stream_->DetachDelegate(); | |
105 DCHECK(!stream_); | |
106 } | |
107 | |
108 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() { | |
109 delegate_->OnRequestHeadersSent(); | |
110 } | |
111 | |
112 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated( | |
113 const SpdyHeaderBlock& response_headers) { | |
114 delegate_->OnHeaders(response_headers); | |
115 return RESPONSE_HEADERS_ARE_COMPLETE; | |
116 } | |
117 | |
118 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
119 DCHECK(stream_); | |
120 DCHECK(!stream_closed_); | |
121 | |
122 if (buffer) { | |
123 data_queue_.Enqueue(buffer.Pass()); | |
124 if (user_buffer_) { | |
125 // Handing small chunks of data to the caller creates measurable overhead. | |
126 // So buffer data in short time-spans and send a single read notification. | |
127 ScheduleBufferedRead(); | |
128 } | |
129 } | |
130 // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by | |
131 // SpdyStream to indicate the end of stream. | |
132 } | |
133 | |
134 void BidirectionalStreamSpdyJob::OnDataSent() { | |
135 DCHECK(stream_); | |
136 DCHECK(!stream_closed_); | |
137 | |
138 delegate_->OnDataSent(); | |
139 } | |
140 | |
141 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) { | |
142 DCHECK(stream_); | |
143 DCHECK(!stream_closed_); | |
144 | |
145 delegate_->OnTrailers(trailers); | |
146 } | |
147 | |
148 void BidirectionalStreamSpdyJob::OnClose(int status) { | |
149 DCHECK(stream_); | |
150 | |
151 stream_closed_ = true; | |
152 closed_stream_status_ = status; | |
153 stream_.reset(); | |
154 | |
155 if (status != OK) { | |
156 delegate_->OnFailed(status); | |
157 return; | |
158 } | |
159 // Complete any remaining read, as all data has been buffered. | |
160 // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will | |
161 // do nothing. | |
162 DCHECK_EQ(OK, status); | |
163 timer_->Stop(); | |
164 DoBufferedRead(); | |
165 } | |
166 | |
167 void BidirectionalStreamSpdyJob::SendRequestHeaders() { | |
168 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
169 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers, | |
170 stream_->GetProtocolVersion(), true, | |
171 headers.get()); | |
172 bool end_stream = (request_info_.method == "GET"); | |
mef
2015/12/07 21:18:07
I suggest that we add at least 'HEAD', but I would
xunjieli
2015/12/08 16:08:46
Done.
mef
2015/12/11 20:06:12
Per our chat I've meant that we need to pass an ex
xunjieli
2015/12/11 23:48:39
Done.
| |
173 stream_->SendRequestHeaders( | |
174 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
175 } | |
176 | |
177 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) { | |
178 DCHECK_NE(ERR_IO_PENDING, rv); | |
179 if (rv == OK) { | |
180 stream_ = stream_request_.ReleaseStream(); | |
181 stream_->SetDelegate(this); | |
182 SendRequestHeaders(); | |
183 return; | |
184 } | |
185 delegate_->OnFailed(static_cast<Error>(rv)); | |
186 } | |
187 | |
188 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() { | |
189 // If there is already a scheduled DoBufferedRead, don't issue | |
190 // another one. Mark that we have received more data and return. | |
191 if (timer_->IsRunning()) { | |
192 more_read_data_pending_ = true; | |
193 return; | |
194 } | |
195 | |
196 more_read_data_pending_ = false; | |
197 timer_->Start(FROM_HERE, kBufferTime, | |
198 base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead, | |
199 weak_factory_.GetWeakPtr())); | |
200 } | |
201 | |
202 void BidirectionalStreamSpdyJob::DoBufferedRead() { | |
203 DCHECK(!timer_->IsRunning()); | |
204 // If the stream errored out, do not complete the read. | |
205 if (!stream_ && !stream_closed_) | |
206 return; | |
207 if (stream_closed_ && closed_stream_status_ != OK) | |
208 return; | |
209 | |
210 // When |more_read_data_pending_| is true, it means that more data has arrived | |
211 // since started waiting. Wait a little longer and continue to buffer. | |
212 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
213 ScheduleBufferedRead(); | |
214 return; | |
215 } | |
216 | |
217 int rv = 0; | |
218 if (user_buffer_) { | |
219 rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
220 DCHECK_NE(ERR_IO_PENDING, rv); | |
221 user_buffer_ = nullptr; | |
222 user_buffer_len_ = 0; | |
223 delegate_->OnReadCompleted(rv); | |
224 } | |
225 } | |
226 | |
227 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const { | |
228 if (stream_closed_) | |
229 return false; | |
230 DCHECK_GT(user_buffer_len_, 0); | |
231 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
232 } | |
233 | |
234 } // namespace net | |
OLD | NEW |