OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/chromium/quic_chromium_client_stream.h" | 5 #include "net/quic/chromium/quic_chromium_client_stream.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind_helpers.h" | 9 #include "base/bind_helpers.h" |
10 #include "base/callback_helpers.h" | 10 #include "base/callback_helpers.h" |
(...skipping 14 matching lines...) Expand all Loading... |
25 QuicStreamId id, | 25 QuicStreamId id, |
26 QuicClientSessionBase* session, | 26 QuicClientSessionBase* session, |
27 const NetLogWithSource& net_log) | 27 const NetLogWithSource& net_log) |
28 : QuicSpdyStream(id, session), | 28 : QuicSpdyStream(id, session), |
29 net_log_(net_log), | 29 net_log_(net_log), |
30 delegate_(nullptr), | 30 delegate_(nullptr), |
31 headers_delivered_(false), | 31 headers_delivered_(false), |
32 initial_headers_sent_(false), | 32 initial_headers_sent_(false), |
33 session_(session), | 33 session_(session), |
34 can_migrate_(true), | 34 can_migrate_(true), |
| 35 initial_headers_frame_len_(0), |
35 weak_factory_(this) {} | 36 weak_factory_(this) {} |
36 | 37 |
37 QuicChromiumClientStream::~QuicChromiumClientStream() { | 38 QuicChromiumClientStream::~QuicChromiumClientStream() { |
38 if (delegate_) | 39 if (delegate_) |
39 delegate_->OnClose(); | 40 delegate_->OnClose(); |
40 } | 41 } |
41 | 42 |
42 void QuicChromiumClientStream::OnInitialHeadersComplete( | 43 void QuicChromiumClientStream::OnInitialHeadersComplete( |
43 bool fin, | 44 bool fin, |
44 size_t frame_len, | 45 size_t frame_len, |
45 const QuicHeaderList& header_list) { | 46 const QuicHeaderList& header_list) { |
46 QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list); | 47 QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list); |
47 | 48 |
48 SpdyHeaderBlock header_block; | 49 SpdyHeaderBlock header_block; |
49 int64_t length = -1; | 50 int64_t length = -1; |
50 if (!SpdyUtils::CopyAndValidateHeaders(header_list, &length, &header_block)) { | 51 if (!SpdyUtils::CopyAndValidateHeaders(header_list, &length, &header_block)) { |
51 DLOG(ERROR) << "Failed to parse header list: " << header_list.DebugString(); | 52 DLOG(ERROR) << "Failed to parse header list: " << header_list.DebugString(); |
52 ConsumeHeaderList(); | 53 ConsumeHeaderList(); |
53 Reset(QUIC_BAD_APPLICATION_PAYLOAD); | 54 Reset(QUIC_BAD_APPLICATION_PAYLOAD); |
54 return; | 55 return; |
55 } | 56 } |
56 | 57 |
57 ConsumeHeaderList(); | 58 ConsumeHeaderList(); |
58 session_->OnInitialHeadersComplete(id(), header_block); | 59 session_->OnInitialHeadersComplete(id(), header_block); |
59 | 60 |
60 // The delegate will read the headers via a posted task. | 61 if (delegate_) { |
61 NotifyDelegateOfHeadersCompleteLater(std::move(header_block), frame_len); | 62 // The delegate will receive the headers via a posted task. |
| 63 NotifyDelegateOfHeadersCompleteLater(std::move(header_block), frame_len); |
| 64 return; |
| 65 } |
| 66 |
| 67 // Buffer the headers and deliver them when the delegate arrives. |
| 68 initial_headers_ = std::move(header_block); |
| 69 initial_headers_frame_len_ = frame_len; |
62 } | 70 } |
63 | 71 |
64 void QuicChromiumClientStream::OnTrailingHeadersComplete( | 72 void QuicChromiumClientStream::OnTrailingHeadersComplete( |
65 bool fin, | 73 bool fin, |
66 size_t frame_len, | 74 size_t frame_len, |
67 const QuicHeaderList& header_list) { | 75 const QuicHeaderList& header_list) { |
68 QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list); | 76 QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list); |
69 NotifyDelegateOfHeadersCompleteLater(received_trailers().Clone(), frame_len); | 77 NotifyDelegateOfHeadersCompleteLater(received_trailers().Clone(), frame_len); |
70 } | 78 } |
71 | 79 |
(...skipping 22 matching lines...) Expand all Loading... |
94 } | 102 } |
95 | 103 |
96 if (!sequencer()->HasBytesToRead() && !FinishedReadingTrailers()) { | 104 if (!sequencer()->HasBytesToRead() && !FinishedReadingTrailers()) { |
97 // If there is no data to read, wait until either FIN is received or | 105 // If there is no data to read, wait until either FIN is received or |
98 // trailers are delivered. | 106 // trailers are delivered. |
99 return; | 107 return; |
100 } | 108 } |
101 | 109 |
102 // The delegate will read the data via a posted task, and | 110 // The delegate will read the data via a posted task, and |
103 // will be able to, potentially, read all data which has queued up. | 111 // will be able to, potentially, read all data which has queued up. |
104 NotifyDelegateOfDataAvailableLater(); | 112 if (delegate_) |
| 113 NotifyDelegateOfDataAvailableLater(); |
105 } | 114 } |
106 | 115 |
107 void QuicChromiumClientStream::OnClose() { | 116 void QuicChromiumClientStream::OnClose() { |
108 if (delegate_) { | 117 if (delegate_) { |
109 delegate_->OnClose(); | 118 delegate_->OnClose(); |
110 delegate_ = nullptr; | 119 delegate_ = nullptr; |
111 delegate_tasks_.clear(); | |
112 } | 120 } |
113 QuicStream::OnClose(); | 121 QuicStream::OnClose(); |
114 } | 122 } |
115 | 123 |
116 void QuicChromiumClientStream::OnCanWrite() { | 124 void QuicChromiumClientStream::OnCanWrite() { |
117 QuicStream::OnCanWrite(); | 125 QuicStream::OnCanWrite(); |
118 | 126 |
119 if (!HasBufferedData() && !write_callback_.is_null()) { | 127 if (!HasBufferedData() && !write_callback_.is_null()) { |
120 base::ResetAndReturn(&write_callback_).Run(OK); | 128 base::ResetAndReturn(&write_callback_).Run(OK); |
121 } | 129 } |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
179 } | 187 } |
180 | 188 |
181 write_callback_ = callback; | 189 write_callback_ = callback; |
182 return ERR_IO_PENDING; | 190 return ERR_IO_PENDING; |
183 } | 191 } |
184 | 192 |
185 void QuicChromiumClientStream::SetDelegate( | 193 void QuicChromiumClientStream::SetDelegate( |
186 QuicChromiumClientStream::Delegate* delegate) { | 194 QuicChromiumClientStream::Delegate* delegate) { |
187 DCHECK(!(delegate_ && delegate)); | 195 DCHECK(!(delegate_ && delegate)); |
188 delegate_ = delegate; | 196 delegate_ = delegate; |
189 if (delegate == nullptr) { | 197 if (delegate == nullptr) |
190 DCHECK(delegate_tasks_.empty()); | |
191 return; | 198 return; |
192 } | 199 |
193 while (!delegate_tasks_.empty()) { | 200 // Should this perhaps be via PostTask to make reasoning simpler? |
194 base::Closure closure = delegate_tasks_.front(); | 201 if (!initial_headers_.empty()) { |
195 delegate_tasks_.pop_front(); | 202 delegate_->OnHeadersAvailable(std::move(initial_headers_), |
196 closure.Run(); | 203 initial_headers_frame_len_); |
197 } | 204 } |
198 } | 205 } |
199 | 206 |
200 void QuicChromiumClientStream::OnError(int error) { | 207 void QuicChromiumClientStream::OnError(int error) { |
201 if (delegate_) { | 208 if (delegate_) { |
202 QuicChromiumClientStream::Delegate* delegate = delegate_; | 209 QuicChromiumClientStream::Delegate* delegate = delegate_; |
203 delegate_ = nullptr; | 210 delegate_ = nullptr; |
204 delegate_tasks_.clear(); | |
205 delegate->OnError(error); | 211 delegate->OnError(error); |
206 } | 212 } |
207 } | 213 } |
208 | 214 |
209 int QuicChromiumClientStream::Read(IOBuffer* buf, int buf_len) { | 215 int QuicChromiumClientStream::Read(IOBuffer* buf, int buf_len) { |
210 if (IsDoneReading()) | 216 if (IsDoneReading()) |
211 return 0; // EOF | 217 return 0; // EOF |
212 | 218 |
213 if (!HasBytesToRead()) | 219 if (!HasBytesToRead()) |
214 return ERR_IO_PENDING; | 220 return ERR_IO_PENDING; |
215 | 221 |
216 iovec iov; | 222 iovec iov; |
217 iov.iov_base = buf->data(); | 223 iov.iov_base = buf->data(); |
218 iov.iov_len = buf_len; | 224 iov.iov_len = buf_len; |
219 size_t bytes_read = Readv(&iov, 1); | 225 size_t bytes_read = Readv(&iov, 1); |
220 // Since HasBytesToRead is true, Readv() must of read some data. | 226 // Since HasBytesToRead is true, Readv() must of read some data. |
221 DCHECK_NE(0u, bytes_read); | 227 DCHECK_NE(0u, bytes_read); |
222 return bytes_read; | 228 return bytes_read; |
223 } | 229 } |
224 | 230 |
225 void QuicChromiumClientStream::NotifyDelegateOfHeadersCompleteLater( | 231 void QuicChromiumClientStream::NotifyDelegateOfHeadersCompleteLater( |
226 SpdyHeaderBlock headers, | 232 SpdyHeaderBlock headers, |
227 size_t frame_len) { | 233 size_t frame_len) { |
228 RunOrBuffer(base::Bind( | 234 DCHECK(delegate_); |
229 &QuicChromiumClientStream::NotifyDelegateOfHeadersComplete, | 235 base::ThreadTaskRunnerHandle::Get()->PostTask( |
230 weak_factory_.GetWeakPtr(), base::Passed(std::move(headers)), frame_len)); | 236 FROM_HERE, |
| 237 base::Bind(&QuicChromiumClientStream::NotifyDelegateOfHeadersComplete, |
| 238 weak_factory_.GetWeakPtr(), base::Passed(std::move(headers)), |
| 239 frame_len)); |
231 } | 240 } |
232 | 241 |
233 void QuicChromiumClientStream::NotifyDelegateOfHeadersComplete( | 242 void QuicChromiumClientStream::NotifyDelegateOfHeadersComplete( |
234 SpdyHeaderBlock headers, | 243 SpdyHeaderBlock headers, |
235 size_t frame_len) { | 244 size_t frame_len) { |
236 if (!delegate_) | 245 if (!delegate_) |
237 return; | 246 return; |
238 // Only mark trailers consumed when we are about to notify delegate. | 247 // Only mark trailers consumed when we are about to notify delegate. |
239 if (headers_delivered_) { | 248 if (headers_delivered_) { |
240 MarkTrailersConsumed(); | 249 MarkTrailersConsumed(); |
241 // Post an async task to notify delegate of the FIN flag. | 250 // Post an async task to notify delegate of the FIN flag. |
242 NotifyDelegateOfDataAvailableLater(); | 251 NotifyDelegateOfDataAvailableLater(); |
243 net_log_.AddEvent( | 252 net_log_.AddEvent( |
244 NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_TRAILERS, | 253 NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_TRAILERS, |
245 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); | 254 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); |
246 } else { | 255 } else { |
247 headers_delivered_ = true; | 256 headers_delivered_ = true; |
248 net_log_.AddEvent( | 257 net_log_.AddEvent( |
249 NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_HEADERS, | 258 NetLogEventType::QUIC_CHROMIUM_CLIENT_STREAM_READ_RESPONSE_HEADERS, |
250 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); | 259 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); |
251 } | 260 } |
252 | 261 |
253 delegate_->OnHeadersAvailable(headers, frame_len); | 262 delegate_->OnHeadersAvailable(headers, frame_len); |
254 } | 263 } |
255 | 264 |
256 void QuicChromiumClientStream::NotifyDelegateOfDataAvailableLater() { | 265 void QuicChromiumClientStream::NotifyDelegateOfDataAvailableLater() { |
257 RunOrBuffer( | 266 DCHECK(delegate_); |
| 267 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 268 FROM_HERE, |
258 base::Bind(&QuicChromiumClientStream::NotifyDelegateOfDataAvailable, | 269 base::Bind(&QuicChromiumClientStream::NotifyDelegateOfDataAvailable, |
259 weak_factory_.GetWeakPtr())); | 270 weak_factory_.GetWeakPtr())); |
260 } | 271 } |
261 | 272 |
262 void QuicChromiumClientStream::NotifyDelegateOfDataAvailable() { | 273 void QuicChromiumClientStream::NotifyDelegateOfDataAvailable() { |
263 if (delegate_) | 274 if (delegate_) |
264 delegate_->OnDataAvailable(); | 275 delegate_->OnDataAvailable(); |
265 } | 276 } |
266 | 277 |
267 void QuicChromiumClientStream::RunOrBuffer(base::Closure closure) { | |
268 if (delegate_) { | |
269 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, closure); | |
270 } else { | |
271 delegate_tasks_.push_back(closure); | |
272 } | |
273 } | |
274 | |
275 void QuicChromiumClientStream::DisableConnectionMigration() { | 278 void QuicChromiumClientStream::DisableConnectionMigration() { |
276 can_migrate_ = false; | 279 can_migrate_ = false; |
277 } | 280 } |
278 | 281 |
279 bool QuicChromiumClientStream::IsFirstStream() { | 282 bool QuicChromiumClientStream::IsFirstStream() { |
280 return id() == kHeadersStreamId + 2; | 283 return id() == kHeadersStreamId + 2; |
281 } | 284 } |
282 | 285 |
283 } // namespace net | 286 } // namespace net |
OLD | NEW |