OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/quic/quic_reliable_client_stream.h" | |
6 | |
7 #include "base/callback_helpers.h" | |
8 #include "base/location.h" | |
9 #include "base/thread_task_runner_handle.h" | |
10 #include "net/base/io_buffer.h" | |
11 #include "net/base/net_errors.h" | |
12 #include "net/quic/quic_spdy_session.h" | |
13 #include "net/quic/quic_write_blocked_list.h" | |
14 #include "net/quic/spdy_utils.h" | |
15 | |
16 namespace net { | |
17 | |
18 QuicReliableClientStream::QuicReliableClientStream(QuicStreamId id, | |
19 QuicSpdySession* session, | |
20 const BoundNetLog& net_log) | |
21 : QuicSpdyStream(id, session), | |
22 net_log_(net_log), | |
23 delegate_(nullptr), | |
24 headers_delivered_(false), | |
25 weak_factory_(this) {} | |
26 | |
27 QuicReliableClientStream::~QuicReliableClientStream() { | |
28 if (delegate_) | |
29 delegate_->OnClose(connection_error()); | |
30 } | |
31 | |
32 void QuicReliableClientStream::OnStreamHeadersComplete(bool fin, | |
33 size_t frame_len) { | |
34 QuicSpdyStream::OnStreamHeadersComplete(fin, frame_len); | |
35 // The delegate will read the headers via a posted task. | |
36 NotifyDelegateOfHeadersCompleteLater(frame_len); | |
37 } | |
38 | |
39 void QuicReliableClientStream::OnDataAvailable() { | |
40 // TODO(rch): buffer data if we don't have a delegate. | |
41 if (!delegate_) { | |
42 DLOG(ERROR) << "Missing delegate"; | |
43 Reset(QUIC_STREAM_CANCELLED); | |
44 return; | |
45 } | |
46 | |
47 if (!FinishedReadingHeaders() || !headers_delivered_) { | |
48 // Buffer the data in the sequencer until the headers have been read. | |
49 return; | |
50 } | |
51 | |
52 // The delegate will read the data via a posted task, and | |
53 // will be able to, potentially, read all data which has queued up. | |
54 NotifyDelegateOfDataAvailableLater(); | |
55 } | |
56 | |
57 void QuicReliableClientStream::OnClose() { | |
58 if (delegate_) { | |
59 delegate_->OnClose(connection_error()); | |
60 delegate_ = nullptr; | |
61 } | |
62 ReliableQuicStream::OnClose(); | |
63 } | |
64 | |
65 void QuicReliableClientStream::OnCanWrite() { | |
66 ReliableQuicStream::OnCanWrite(); | |
67 | |
68 if (!HasBufferedData() && !callback_.is_null()) { | |
69 base::ResetAndReturn(&callback_).Run(OK); | |
70 } | |
71 } | |
72 | |
73 SpdyPriority QuicReliableClientStream::Priority() const { | |
74 if (delegate_ && delegate_->HasSendHeadersComplete()) { | |
75 return QuicSpdyStream::Priority(); | |
76 } | |
77 return net::kV3HighestPriority; | |
78 } | |
79 | |
80 int QuicReliableClientStream::WriteStreamData( | |
81 base::StringPiece data, | |
82 bool fin, | |
83 const CompletionCallback& callback) { | |
84 // We should not have data buffered. | |
85 DCHECK(!HasBufferedData()); | |
86 // Writes the data, or buffers it. | |
87 WriteOrBufferData(data, fin, nullptr); | |
88 if (!HasBufferedData()) { | |
89 return OK; | |
90 } | |
91 | |
92 callback_ = callback; | |
93 return ERR_IO_PENDING; | |
94 } | |
95 | |
96 void QuicReliableClientStream::SetDelegate( | |
97 QuicReliableClientStream::Delegate* delegate) { | |
98 DCHECK(!(delegate_ && delegate)); | |
99 delegate_ = delegate; | |
100 if (delegate == nullptr && sequencer()->IsClosed()) { | |
101 OnFinRead(); | |
102 } | |
103 } | |
104 | |
105 void QuicReliableClientStream::OnError(int error) { | |
106 if (delegate_) { | |
107 QuicReliableClientStream::Delegate* delegate = delegate_; | |
108 delegate_ = nullptr; | |
109 delegate->OnError(error); | |
110 } | |
111 } | |
112 | |
113 int QuicReliableClientStream::Read(IOBuffer* buf, int buf_len) { | |
114 if (sequencer()->IsClosed()) | |
115 return 0; // EOF | |
116 | |
117 if (!HasBytesToRead()) | |
118 return ERR_IO_PENDING; | |
119 | |
120 iovec iov; | |
121 iov.iov_base = buf->data(); | |
122 iov.iov_len = buf_len; | |
123 return Readv(&iov, 1); | |
124 } | |
125 | |
126 bool QuicReliableClientStream::CanWrite(const CompletionCallback& callback) { | |
127 bool can_write = session()->connection()->CanWrite(HAS_RETRANSMITTABLE_DATA); | |
128 if (!can_write) { | |
129 session()->MarkConnectionLevelWriteBlocked(id(), Priority()); | |
130 DCHECK(callback_.is_null()); | |
131 callback_ = callback; | |
132 } | |
133 return can_write; | |
134 } | |
135 | |
136 void QuicReliableClientStream::NotifyDelegateOfHeadersCompleteLater( | |
137 size_t frame_len) { | |
138 DCHECK(delegate_); | |
139 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
140 FROM_HERE, | |
141 base::Bind(&QuicReliableClientStream::NotifyDelegateOfHeadersComplete, | |
142 weak_factory_.GetWeakPtr(), frame_len)); | |
143 } | |
144 | |
145 void QuicReliableClientStream::NotifyDelegateOfHeadersComplete( | |
146 size_t frame_len) { | |
147 if (!delegate_) | |
148 return; | |
149 | |
150 size_t headers_len = decompressed_headers().length(); | |
151 SpdyHeaderBlock headers; | |
152 SpdyFramer framer(HTTP2); | |
153 if (!framer.ParseHeaderBlockInBuffer(decompressed_headers().data(), | |
154 headers_len, &headers)) { | |
155 DLOG(WARNING) << "Invalid headers"; | |
156 Reset(QUIC_BAD_APPLICATION_PAYLOAD); | |
157 return; | |
158 } | |
159 MarkHeadersConsumed(headers_len); | |
160 headers_delivered_ = true; | |
161 | |
162 delegate_->OnHeadersAvailable(headers, frame_len); | |
163 } | |
164 | |
165 void QuicReliableClientStream::NotifyDelegateOfDataAvailableLater() { | |
166 DCHECK(delegate_); | |
167 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
168 FROM_HERE, | |
169 base::Bind(&QuicReliableClientStream::NotifyDelegateOfDataAvailable, | |
170 weak_factory_.GetWeakPtr())); | |
171 } | |
172 | |
173 void QuicReliableClientStream::NotifyDelegateOfDataAvailable() { | |
174 if (delegate_) | |
175 delegate_->OnDataAvailable(); | |
176 } | |
177 | |
178 } // namespace net | |
OLD | NEW |