| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/quic_stream_sequencer.h" | 5 #include "net/quic/quic_stream_sequencer.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 55 | 55 |
| 56 IOVector data; | 56 IOVector data; |
| 57 data.AppendIovec(frame.data.iovec(), frame.data.Size()); | 57 data.AppendIovec(frame.data.iovec(), frame.data.Size()); |
| 58 | 58 |
| 59 // If the frame has arrived in-order then we can process it immediately, only | 59 // If the frame has arrived in-order then we can process it immediately, only |
| 60 // buffering if the stream is unable to process it. | 60 // buffering if the stream is unable to process it. |
| 61 if (!blocked_ && byte_offset == num_bytes_consumed_) { | 61 if (!blocked_ && byte_offset == num_bytes_consumed_) { |
| 62 DVLOG(1) << "Processing byte offset " << byte_offset; | 62 DVLOG(1) << "Processing byte offset " << byte_offset; |
| 63 size_t bytes_consumed = 0; | 63 size_t bytes_consumed = 0; |
| 64 for (size_t i = 0; i < data.Size(); ++i) { | 64 for (size_t i = 0; i < data.Size(); ++i) { |
| 65 bytes_consumed += stream_->ProcessRawData( | 65 bytes_consumed += |
| 66 static_cast<char*>(data.iovec()[i].iov_base), | 66 stream_->ProcessRawData(static_cast<char*>(data.iovec()[i].iov_base), |
| 67 data.iovec()[i].iov_len); | 67 data.iovec()[i].iov_len); |
| 68 } | 68 } |
| 69 num_bytes_consumed_ += bytes_consumed; | 69 num_bytes_consumed_ += bytes_consumed; |
| 70 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); | 70 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); |
| 71 stream_->MaybeSendWindowUpdate(); | 71 stream_->MaybeSendWindowUpdate(); |
| 72 | 72 |
| 73 if (MaybeCloseStream()) { | 73 if (MaybeCloseStream()) { |
| 74 return true; | 74 return true; |
| 75 } | 75 } |
| 76 if (bytes_consumed > data_len) { | 76 if (bytes_consumed > data_len) { |
| 77 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | 77 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 111 } | 111 } |
| 112 | 112 |
| 113 close_offset_ = offset; | 113 close_offset_ = offset; |
| 114 | 114 |
| 115 MaybeCloseStream(); | 115 MaybeCloseStream(); |
| 116 } | 116 } |
| 117 | 117 |
| 118 bool QuicStreamSequencer::MaybeCloseStream() { | 118 bool QuicStreamSequencer::MaybeCloseStream() { |
| 119 if (!blocked_ && IsClosed()) { | 119 if (!blocked_ && IsClosed()) { |
| 120 DVLOG(1) << "Passing up termination, as we've processed " | 120 DVLOG(1) << "Passing up termination, as we've processed " |
| 121 << num_bytes_consumed_ << " of " << close_offset_ | 121 << num_bytes_consumed_ << " of " << close_offset_ << " bytes."; |
| 122 << " bytes."; | |
| 123 // Technically it's an error if num_bytes_consumed isn't exactly | 122 // Technically it's an error if num_bytes_consumed isn't exactly |
| 124 // equal, but error handling seems silly at this point. | 123 // equal, but error handling seems silly at this point. |
| 125 stream_->OnFinRead(); | 124 stream_->OnFinRead(); |
| 126 frames_.clear(); | 125 frames_.clear(); |
| 127 num_bytes_buffered_ = 0; | 126 num_bytes_buffered_ = 0; |
| 128 return true; | 127 return true; |
| 129 } | 128 } |
| 130 return false; | 129 return false; |
| 131 } | 130 } |
| 132 | 131 |
| 133 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { | 132 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { |
| 134 DCHECK(!blocked_); | 133 DCHECK(!blocked_); |
| 135 FrameMap::iterator it = frames_.begin(); | 134 FrameMap::iterator it = frames_.begin(); |
| 136 size_t index = 0; | 135 size_t index = 0; |
| 137 QuicStreamOffset offset = num_bytes_consumed_; | 136 QuicStreamOffset offset = num_bytes_consumed_; |
| 138 while (it != frames_.end() && index < iov_len) { | 137 while (it != frames_.end() && index < iov_len) { |
| 139 if (it->first != offset) return index; | 138 if (it->first != offset) |
| 139 return index; |
| 140 | 140 |
| 141 iov[index].iov_base = static_cast<void*>( | 141 iov[index].iov_base = |
| 142 const_cast<char*>(it->second.data())); | 142 static_cast<void*>(const_cast<char*>(it->second.data())); |
| 143 iov[index].iov_len = it->second.size(); | 143 iov[index].iov_len = it->second.size(); |
| 144 offset += it->second.size(); | 144 offset += it->second.size(); |
| 145 | 145 |
| 146 ++index; | 146 ++index; |
| 147 ++it; | 147 ++it; |
| 148 } | 148 } |
| 149 return index; | 149 return index; |
| 150 } | 150 } |
| 151 | 151 |
| 152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | 152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { |
| 153 DCHECK(!blocked_); | 153 DCHECK(!blocked_); |
| 154 FrameMap::iterator it = frames_.begin(); | 154 FrameMap::iterator it = frames_.begin(); |
| 155 size_t iov_index = 0; | 155 size_t iov_index = 0; |
| 156 size_t iov_offset = 0; | 156 size_t iov_offset = 0; |
| 157 size_t frame_offset = 0; | 157 size_t frame_offset = 0; |
| 158 size_t initial_bytes_consumed = num_bytes_consumed_; | 158 size_t initial_bytes_consumed = num_bytes_consumed_; |
| 159 | 159 |
| 160 while (iov_index < iov_len && | 160 while (iov_index < iov_len && it != frames_.end() && |
| 161 it != frames_.end() && | |
| 162 it->first == num_bytes_consumed_) { | 161 it->first == num_bytes_consumed_) { |
| 163 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | 162 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, |
| 164 it->second.size() - frame_offset); | 163 it->second.size() - frame_offset); |
| 165 | 164 |
| 166 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | 165 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; |
| 167 memcpy(iov_ptr, | 166 memcpy(iov_ptr, it->second.data() + frame_offset, bytes_to_read); |
| 168 it->second.data() + frame_offset, bytes_to_read); | |
| 169 frame_offset += bytes_to_read; | 167 frame_offset += bytes_to_read; |
| 170 iov_offset += bytes_to_read; | 168 iov_offset += bytes_to_read; |
| 171 | 169 |
| 172 if (iov[iov_index].iov_len == iov_offset) { | 170 if (iov[iov_index].iov_len == iov_offset) { |
| 173 // We've filled this buffer. | 171 // We've filled this buffer. |
| 174 iov_offset = 0; | 172 iov_offset = 0; |
| 175 ++iov_index; | 173 ++iov_index; |
| 176 } | 174 } |
| 177 if (it->second.size() == frame_offset) { | 175 if (it->second.size() == frame_offset) { |
| 178 // We've copied this whole frame | 176 // We've copied this whole frame |
| 179 RecordBytesConsumed(it->second.size()); | 177 RecordBytesConsumed(it->second.size()); |
| 180 frames_.erase(it); | 178 frames_.erase(it); |
| 181 it = frames_.begin(); | 179 it = frames_.begin(); |
| 182 frame_offset = 0; | 180 frame_offset = 0; |
| 183 } | 181 } |
| 184 } | 182 } |
| 185 // We've finished copying. If we have a partial frame, update it. | 183 // We've finished copying. If we have a partial frame, update it. |
| 186 if (frame_offset != 0) { | 184 if (frame_offset != 0) { |
| 187 frames_.insert(make_pair(it->first + frame_offset, | 185 frames_.insert( |
| 188 it->second.substr(frame_offset))); | 186 make_pair(it->first + frame_offset, it->second.substr(frame_offset))); |
| 189 frames_.erase(frames_.begin()); | 187 frames_.erase(frames_.begin()); |
| 190 RecordBytesConsumed(frame_offset); | 188 RecordBytesConsumed(frame_offset); |
| 191 } | 189 } |
| 192 return num_bytes_consumed_ - initial_bytes_consumed; | 190 return num_bytes_consumed_ - initial_bytes_consumed; |
| 193 } | 191 } |
| 194 | 192 |
| 195 bool QuicStreamSequencer::HasBytesToRead() const { | 193 bool QuicStreamSequencer::HasBytesToRead() const { |
| 196 FrameMap::const_iterator it = frames_.begin(); | 194 FrameMap::const_iterator it = frames_.begin(); |
| 197 | 195 |
| 198 return it != frames_.end() && it->first == num_bytes_consumed_; | 196 return it != frames_.end() && it->first == num_bytes_consumed_; |
| 199 } | 197 } |
| 200 | 198 |
| 201 bool QuicStreamSequencer::IsClosed() const { | 199 bool QuicStreamSequencer::IsClosed() const { |
| 202 return num_bytes_consumed_ >= close_offset_; | 200 return num_bytes_consumed_ >= close_offset_; |
| 203 } | 201 } |
| 204 | 202 |
| 205 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { | 203 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { |
| 206 // A frame is duplicate if the frame offset is smaller than our bytes consumed | 204 // A frame is duplicate if the frame offset is smaller than our bytes consumed |
| 207 // or we have stored the frame in our map. | 205 // or we have stored the frame in our map. |
| 208 // TODO(pwestin): Is it possible that a new frame contain more data even if | 206 // TODO(pwestin): Is it possible that a new frame contain more data even if |
| 209 // the offset is the same? | 207 // the offset is the same? |
| 210 return frame.offset < num_bytes_consumed_ || | 208 return frame.offset < num_bytes_consumed_ || |
| 211 frames_.find(frame.offset) != frames_.end(); | 209 frames_.find(frame.offset) != frames_.end(); |
| 212 } | 210 } |
| 213 | 211 |
| 214 void QuicStreamSequencer::SetBlockedUntilFlush() { | 212 void QuicStreamSequencer::SetBlockedUntilFlush() { |
| 215 blocked_ = true; | 213 blocked_ = true; |
| 216 } | 214 } |
| 217 | 215 |
| 218 void QuicStreamSequencer::FlushBufferedFrames() { | 216 void QuicStreamSequencer::FlushBufferedFrames() { |
| 219 blocked_ = false; | 217 blocked_ = false; |
| 220 FrameMap::iterator it = frames_.find(num_bytes_consumed_); | 218 FrameMap::iterator it = frames_.find(num_bytes_consumed_); |
| 221 while (it != frames_.end()) { | 219 while (it != frames_.end()) { |
| 222 DVLOG(1) << "Flushing buffered packet at offset " << it->first; | 220 DVLOG(1) << "Flushing buffered packet at offset " << it->first; |
| 223 string* data = &it->second; | 221 string* data = &it->second; |
| 224 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), | 222 size_t bytes_consumed = |
| 225 data->size()); | 223 stream_->ProcessRawData(data->c_str(), data->size()); |
| 226 RecordBytesConsumed(bytes_consumed); | 224 RecordBytesConsumed(bytes_consumed); |
| 227 if (MaybeCloseStream()) { | 225 if (MaybeCloseStream()) { |
| 228 return; | 226 return; |
| 229 } | 227 } |
| 230 if (bytes_consumed > data->size()) { | 228 if (bytes_consumed > data->size()) { |
| 231 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error | 229 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error |
| 232 return; | 230 return; |
| 233 } else if (bytes_consumed == data->size()) { | 231 } else if (bytes_consumed == data->size()) { |
| 234 frames_.erase(it); | 232 frames_.erase(it); |
| 235 it = frames_.find(num_bytes_consumed_); | 233 it = frames_.find(num_bytes_consumed_); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 246 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | 244 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { |
| 247 num_bytes_consumed_ += bytes_consumed; | 245 num_bytes_consumed_ += bytes_consumed; |
| 248 num_bytes_buffered_ -= bytes_consumed; | 246 num_bytes_buffered_ -= bytes_consumed; |
| 249 | 247 |
| 250 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); | 248 stream_->flow_controller()->AddBytesConsumed(bytes_consumed); |
| 251 stream_->flow_controller()->RemoveBytesBuffered(bytes_consumed); | 249 stream_->flow_controller()->RemoveBytesBuffered(bytes_consumed); |
| 252 stream_->MaybeSendWindowUpdate(); | 250 stream_->MaybeSendWindowUpdate(); |
| 253 } | 251 } |
| 254 | 252 |
| 255 } // namespace net | 253 } // namespace net |
| OLD | NEW |