OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/quic_stream_sequencer.h" | 5 #include "net/quic/quic_stream_sequencer.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 | 55 |
56 IOVector data; | 56 IOVector data; |
57 data.AppendIovec(frame.data.iovec(), frame.data.Size()); | 57 data.AppendIovec(frame.data.iovec(), frame.data.Size()); |
58 | 58 |
59 // If the frame has arrived in-order then we can process it immediately, only | 59 // If the frame has arrived in-order then we can process it immediately, only |
60 // buffering if the stream is unable to process it. | 60 // buffering if the stream is unable to process it. |
61 if (!blocked_ && byte_offset == num_bytes_consumed_) { | 61 if (!blocked_ && byte_offset == num_bytes_consumed_) { |
62 DVLOG(1) << "Processing byte offset " << byte_offset; | 62 DVLOG(1) << "Processing byte offset " << byte_offset; |
63 size_t bytes_consumed = 0; | 63 size_t bytes_consumed = 0; |
64 for (size_t i = 0; i < data.Size(); ++i) { | 64 for (size_t i = 0; i < data.Size(); ++i) { |
65 bytes_consumed += stream_->ProcessRawData( | 65 bytes_consumed += |
66 static_cast<char*>(data.iovec()[i].iov_base), | 66 stream_->ProcessRawData(static_cast<char*>(data.iovec()[i].iov_base), |
67 data.iovec()[i].iov_len); | 67 data.iovec()[i].iov_len); |
68 } | 68 } |
69 num_bytes_consumed_ += bytes_consumed; | 69 num_bytes_consumed_ += bytes_consumed; |
70 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); | 70 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); |
71 stream_->MaybeSendWindowUpdate(); | 71 stream_->MaybeSendWindowUpdate(); |
72 | 72 |
73 if (MaybeCloseStream()) { | 73 if (MaybeCloseStream()) { |
74 return true; | 74 return true; |
75 } | 75 } |
76 if (bytes_consumed > data_len) { | 76 if (bytes_consumed > data_len) { |
77 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | 77 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
111 } | 111 } |
112 | 112 |
113 close_offset_ = offset; | 113 close_offset_ = offset; |
114 | 114 |
115 MaybeCloseStream(); | 115 MaybeCloseStream(); |
116 } | 116 } |
117 | 117 |
118 bool QuicStreamSequencer::MaybeCloseStream() { | 118 bool QuicStreamSequencer::MaybeCloseStream() { |
119 if (!blocked_ && IsClosed()) { | 119 if (!blocked_ && IsClosed()) { |
120 DVLOG(1) << "Passing up termination, as we've processed " | 120 DVLOG(1) << "Passing up termination, as we've processed " |
121 << num_bytes_consumed_ << " of " << close_offset_ | 121 << num_bytes_consumed_ << " of " << close_offset_ << " bytes."; |
122 << " bytes."; | |
123 // Technically it's an error if num_bytes_consumed isn't exactly | 122 // Technically it's an error if num_bytes_consumed isn't exactly |
124 // equal, but error handling seems silly at this point. | 123 // equal, but error handling seems silly at this point. |
125 stream_->OnFinRead(); | 124 stream_->OnFinRead(); |
126 frames_.clear(); | 125 frames_.clear(); |
127 num_bytes_buffered_ = 0; | 126 num_bytes_buffered_ = 0; |
128 return true; | 127 return true; |
129 } | 128 } |
130 return false; | 129 return false; |
131 } | 130 } |
132 | 131 |
133 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { | 132 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { |
134 DCHECK(!blocked_); | 133 DCHECK(!blocked_); |
135 FrameMap::iterator it = frames_.begin(); | 134 FrameMap::iterator it = frames_.begin(); |
136 size_t index = 0; | 135 size_t index = 0; |
137 QuicStreamOffset offset = num_bytes_consumed_; | 136 QuicStreamOffset offset = num_bytes_consumed_; |
138 while (it != frames_.end() && index < iov_len) { | 137 while (it != frames_.end() && index < iov_len) { |
139 if (it->first != offset) return index; | 138 if (it->first != offset) |
| 139 return index; |
140 | 140 |
141 iov[index].iov_base = static_cast<void*>( | 141 iov[index].iov_base = |
142 const_cast<char*>(it->second.data())); | 142 static_cast<void*>(const_cast<char*>(it->second.data())); |
143 iov[index].iov_len = it->second.size(); | 143 iov[index].iov_len = it->second.size(); |
144 offset += it->second.size(); | 144 offset += it->second.size(); |
145 | 145 |
146 ++index; | 146 ++index; |
147 ++it; | 147 ++it; |
148 } | 148 } |
149 return index; | 149 return index; |
150 } | 150 } |
151 | 151 |
152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | 152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { |
153 DCHECK(!blocked_); | 153 DCHECK(!blocked_); |
154 FrameMap::iterator it = frames_.begin(); | 154 FrameMap::iterator it = frames_.begin(); |
155 size_t iov_index = 0; | 155 size_t iov_index = 0; |
156 size_t iov_offset = 0; | 156 size_t iov_offset = 0; |
157 size_t frame_offset = 0; | 157 size_t frame_offset = 0; |
158 size_t initial_bytes_consumed = num_bytes_consumed_; | 158 size_t initial_bytes_consumed = num_bytes_consumed_; |
159 | 159 |
160 while (iov_index < iov_len && | 160 while (iov_index < iov_len && it != frames_.end() && |
161 it != frames_.end() && | |
162 it->first == num_bytes_consumed_) { | 161 it->first == num_bytes_consumed_) { |
163 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | 162 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, |
164 it->second.size() - frame_offset); | 163 it->second.size() - frame_offset); |
165 | 164 |
166 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | 165 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; |
167 memcpy(iov_ptr, | 166 memcpy(iov_ptr, it->second.data() + frame_offset, bytes_to_read); |
168 it->second.data() + frame_offset, bytes_to_read); | |
169 frame_offset += bytes_to_read; | 167 frame_offset += bytes_to_read; |
170 iov_offset += bytes_to_read; | 168 iov_offset += bytes_to_read; |
171 | 169 |
172 if (iov[iov_index].iov_len == iov_offset) { | 170 if (iov[iov_index].iov_len == iov_offset) { |
173 // We've filled this buffer. | 171 // We've filled this buffer. |
174 iov_offset = 0; | 172 iov_offset = 0; |
175 ++iov_index; | 173 ++iov_index; |
176 } | 174 } |
177 if (it->second.size() == frame_offset) { | 175 if (it->second.size() == frame_offset) { |
178 // We've copied this whole frame | 176 // We've copied this whole frame |
179 RecordBytesConsumed(it->second.size()); | 177 RecordBytesConsumed(it->second.size()); |
180 frames_.erase(it); | 178 frames_.erase(it); |
181 it = frames_.begin(); | 179 it = frames_.begin(); |
182 frame_offset = 0; | 180 frame_offset = 0; |
183 } | 181 } |
184 } | 182 } |
185 // We've finished copying. If we have a partial frame, update it. | 183 // We've finished copying. If we have a partial frame, update it. |
186 if (frame_offset != 0) { | 184 if (frame_offset != 0) { |
187 frames_.insert(make_pair(it->first + frame_offset, | 185 frames_.insert( |
188 it->second.substr(frame_offset))); | 186 make_pair(it->first + frame_offset, it->second.substr(frame_offset))); |
189 frames_.erase(frames_.begin()); | 187 frames_.erase(frames_.begin()); |
190 RecordBytesConsumed(frame_offset); | 188 RecordBytesConsumed(frame_offset); |
191 } | 189 } |
192 return num_bytes_consumed_ - initial_bytes_consumed; | 190 return num_bytes_consumed_ - initial_bytes_consumed; |
193 } | 191 } |
194 | 192 |
195 bool QuicStreamSequencer::HasBytesToRead() const { | 193 bool QuicStreamSequencer::HasBytesToRead() const { |
196 FrameMap::const_iterator it = frames_.begin(); | 194 FrameMap::const_iterator it = frames_.begin(); |
197 | 195 |
198 return it != frames_.end() && it->first == num_bytes_consumed_; | 196 return it != frames_.end() && it->first == num_bytes_consumed_; |
199 } | 197 } |
200 | 198 |
201 bool QuicStreamSequencer::IsClosed() const { | 199 bool QuicStreamSequencer::IsClosed() const { |
202 return num_bytes_consumed_ >= close_offset_; | 200 return num_bytes_consumed_ >= close_offset_; |
203 } | 201 } |
204 | 202 |
205 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { | 203 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { |
206 // A frame is duplicate if the frame offset is smaller than our bytes consumed | 204 // A frame is duplicate if the frame offset is smaller than our bytes consumed |
207 // or we have stored the frame in our map. | 205 // or we have stored the frame in our map. |
208 // TODO(pwestin): Is it possible that a new frame contain more data even if | 206 // TODO(pwestin): Is it possible that a new frame contain more data even if |
209 // the offset is the same? | 207 // the offset is the same? |
210 return frame.offset < num_bytes_consumed_ || | 208 return frame.offset < num_bytes_consumed_ || |
211 frames_.find(frame.offset) != frames_.end(); | 209 frames_.find(frame.offset) != frames_.end(); |
212 } | 210 } |
213 | 211 |
214 void QuicStreamSequencer::SetBlockedUntilFlush() { | 212 void QuicStreamSequencer::SetBlockedUntilFlush() { |
215 blocked_ = true; | 213 blocked_ = true; |
216 } | 214 } |
217 | 215 |
218 void QuicStreamSequencer::FlushBufferedFrames() { | 216 void QuicStreamSequencer::FlushBufferedFrames() { |
219 blocked_ = false; | 217 blocked_ = false; |
220 FrameMap::iterator it = frames_.find(num_bytes_consumed_); | 218 FrameMap::iterator it = frames_.find(num_bytes_consumed_); |
221 while (it != frames_.end()) { | 219 while (it != frames_.end()) { |
222 DVLOG(1) << "Flushing buffered packet at offset " << it->first; | 220 DVLOG(1) << "Flushing buffered packet at offset " << it->first; |
223 string* data = &it->second; | 221 string* data = &it->second; |
224 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), | 222 size_t bytes_consumed = |
225 data->size()); | 223 stream_->ProcessRawData(data->c_str(), data->size()); |
226 RecordBytesConsumed(bytes_consumed); | 224 RecordBytesConsumed(bytes_consumed); |
227 if (MaybeCloseStream()) { | 225 if (MaybeCloseStream()) { |
228 return; | 226 return; |
229 } | 227 } |
230 if (bytes_consumed > data->size()) { | 228 if (bytes_consumed > data->size()) { |
231 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error | 229 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error |
232 return; | 230 return; |
233 } else if (bytes_consumed == data->size()) { | 231 } else if (bytes_consumed == data->size()) { |
234 frames_.erase(it); | 232 frames_.erase(it); |
235 it = frames_.find(num_bytes_consumed_); | 233 it = frames_.find(num_bytes_consumed_); |
(...skipping 10 matching lines...) Expand all Loading... |
246 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | 244 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { |
247 num_bytes_consumed_ += bytes_consumed; | 245 num_bytes_consumed_ += bytes_consumed; |
248 num_bytes_buffered_ -= bytes_consumed; | 246 num_bytes_buffered_ -= bytes_consumed; |
249 | 247 |
250 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); | 248 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); |
251 stream_->flow_controller()->RemoveBytesBuffered(bytes_consumed); | 249 stream_->flow_controller()->RemoveBytesBuffered(bytes_consumed); |
252 stream_->MaybeSendWindowUpdate(); | 250 stream_->MaybeSendWindowUpdate(); |
253 } | 251 } |
254 | 252 |
255 } // namespace net | 253 } // namespace net |
OLD | NEW |