Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 13009012: [SPDY] Refactor SpdySession's write queue (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after
264 264
265 void SpdyStreamRequest::Reset() { 265 void SpdyStreamRequest::Reset() {
266 session_ = NULL; 266 session_ = NULL;
267 stream_ = NULL; 267 stream_ = NULL;
268 url_ = GURL(); 268 url_ = GURL();
269 priority_ = MINIMUM_PRIORITY; 269 priority_ = MINIMUM_PRIORITY;
270 net_log_ = BoundNetLog(); 270 net_log_ = BoundNetLog();
271 callback_.Reset(); 271 callback_.Reset();
272 } 272 }
273 273
274 // static
275 void SpdySession::SpdyIOBufferProducer::ActivateStream(
276 SpdySession* spdy_session,
277 SpdyStream* spdy_stream) {
278 spdy_session->ActivateStream(spdy_stream);
279 }
280
281 // static
282 SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
283 SpdyFrame* frame,
284 RequestPriority priority,
285 SpdyStream* stream) {
286 size_t size = frame->size();
287 DCHECK_GT(size, 0u);
288
289 // TODO(mbelshe): We have too much copying of data here.
290 IOBufferWithSize* buffer = new IOBufferWithSize(size);
291 memcpy(buffer->data(), frame->data(), size);
292
293 return new SpdyIOBuffer(buffer, size, priority, stream);
294 }
295
296 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, 274 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
297 SpdySessionPool* spdy_session_pool, 275 SpdySessionPool* spdy_session_pool,
298 HttpServerProperties* http_server_properties, 276 HttpServerProperties* http_server_properties,
299 bool verify_domain_authentication, 277 bool verify_domain_authentication,
300 bool enable_sending_initial_settings, 278 bool enable_sending_initial_settings,
301 bool enable_credential_frames, 279 bool enable_credential_frames,
302 bool enable_compression, 280 bool enable_compression,
303 bool enable_ping_based_connection_checking, 281 bool enable_ping_based_connection_checking,
304 NextProto default_protocol, 282 NextProto default_protocol,
305 size_t stream_initial_recv_window_size, 283 size_t stream_initial_recv_window_size,
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
382 360
383 if (connection_->is_initialized()) { 361 if (connection_->is_initialized()) {
384 // With SPDY we can't recycle sockets. 362 // With SPDY we can't recycle sockets.
385 connection_->socket()->Disconnect(); 363 connection_->socket()->Disconnect();
386 } 364 }
387 365
388 // Streams should all be gone now. 366 // Streams should all be gone now.
389 DCHECK_EQ(0u, num_active_streams()); 367 DCHECK_EQ(0u, num_active_streams());
390 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); 368 DCHECK_EQ(0u, num_unclaimed_pushed_streams());
391 369
392 for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { 370 for (int i = 0; i < NUM_PRIORITIES; ++i) {
393 DCHECK(pending_create_stream_queues_[i].empty()); 371 DCHECK(pending_create_stream_queues_[i].empty());
394 } 372 }
395 DCHECK(pending_stream_request_completions_.empty()); 373 DCHECK(pending_stream_request_completions_.empty());
396 374
397 RecordHistograms(); 375 RecordHistograms();
398 376
399 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 377 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
400 } 378 }
401 379
402 net::Error SpdySession::InitializeWithSocket( 380 net::Error SpdySession::InitializeWithSocket(
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
478 return true; // This is not a secure session, so all domains are okay. 456 return true; // This is not a secure session, so all domains are okay.
479 457
480 return !ssl_info.client_cert_sent && 458 return !ssl_info.client_cert_sent &&
481 (enable_credential_frames_ || !ssl_info.channel_id_sent || 459 (enable_credential_frames_ || !ssl_info.channel_id_sent ||
482 ServerBoundCertService::GetDomainForHost(domain) == 460 ServerBoundCertService::GetDomainForHost(domain) ==
483 ServerBoundCertService::GetDomainForHost( 461 ServerBoundCertService::GetDomainForHost(
484 host_port_proxy_pair_.first.host())) && 462 host_port_proxy_pair_.first.host())) &&
485 ssl_info.cert->VerifyNameMatch(domain); 463 ssl_info.cert->VerifyNameMatch(domain);
486 } 464 }
487 465
488 void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, 466 void SpdySession::SetStreamHasWriteAvailable(
489 SpdyIOBufferProducer* producer) { 467 SpdyStream* stream,
490 write_queue_.push(producer); 468 scoped_ptr<SpdyFrameProducer> producer) {
491 stream_producers_[producer] = stream; 469 QueueFrameProducerForWriting(producer.Pass(), stream->priority());
492 WriteSocketLater();
493 } 470 }
494 471
495 int SpdySession::GetPushStream( 472 int SpdySession::GetPushStream(
496 const GURL& url, 473 const GURL& url,
497 scoped_refptr<SpdyStream>* stream, 474 scoped_refptr<SpdyStream>* stream,
498 const BoundNetLog& stream_net_log) { 475 const BoundNetLog& stream_net_log) {
499 CHECK_NE(state_, STATE_CLOSED); 476 CHECK_NE(state_, STATE_CLOSED);
500 477
501 *stream = NULL; 478 *stream = NULL;
502 479
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after
842 DCHECK(buffered_spdy_framer_.get()); 819 DCHECK(buffered_spdy_framer_.get());
843 scoped_ptr<SpdyFrame> rst_frame( 820 scoped_ptr<SpdyFrame> rst_frame(
844 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 821 buffered_spdy_framer_->CreateRstStream(stream_id, status));
845 822
846 // Default to lowest priority unless we know otherwise. 823 // Default to lowest priority unless we know otherwise.
847 RequestPriority priority = net::IDLE; 824 RequestPriority priority = net::IDLE;
848 if (IsStreamActive(stream_id)) { 825 if (IsStreamActive(stream_id)) {
849 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 826 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
850 priority = stream->priority(); 827 priority = stream->priority();
851 } 828 }
852 QueueFrame(rst_frame.release(), priority); 829 QueueSessionFrameForWriting(rst_frame.Pass(), priority);
853 RecordProtocolErrorHistogram( 830 RecordProtocolErrorHistogram(
854 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 831 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
855 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 832 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
856 } 833 }
857 834
858 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 835 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
859 return ContainsKey(active_streams_, stream_id); 836 return ContainsKey(active_streams_, stream_id);
860 } 837 }
861 838
862 LoadState SpdySession::GetLoadState() const { 839 LoadState SpdySession::GetLoadState() const {
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
963 data += bytes_processed; 940 data += bytes_processed;
964 } 941 }
965 942
966 if (IsConnected()) 943 if (IsConnected())
967 state_ = STATE_DO_READ; 944 state_ = STATE_DO_READ;
968 return OK; 945 return OK;
969 } 946 }
970 947
971 void SpdySession::OnWriteComplete(int result) { 948 void SpdySession::OnWriteComplete(int result) {
972 DCHECK(write_pending_); 949 DCHECK(write_pending_);
973 DCHECK(in_flight_write_.size()); 950 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
974 951
975 last_activity_time_ = base::TimeTicks::Now(); 952 last_activity_time_ = base::TimeTicks::Now();
976 write_pending_ = false; 953 write_pending_ = false;
977 954
978 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 955 if (result < 0) {
956 in_flight_write_.Release();
957 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
958 return;
959 }
979 960
980 if (result >= 0) { 961 // It should not be possible to have written more bytes than our
981 // It should not be possible to have written more bytes than our 962 // in_flight_write_.
982 // in_flight_write_. 963 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
983 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
984 964
985 in_flight_write_.buffer()->DidConsume(result); 965 in_flight_write_.buffer()->DidConsume(result);
986 966
987 // We only notify the stream when we've fully written the pending frame. 967 // We only notify the stream when we've fully written the pending frame.
988 if (!in_flight_write_.buffer()->BytesRemaining()) { 968 if (in_flight_write_.buffer()->BytesRemaining() == 0) {
989 if (stream) { 969 DCHECK_GT(result, 0);
990 // Report the number of bytes written to the caller, but exclude the
991 // frame size overhead. NOTE: if this frame was compressed the
992 // reported bytes written is the compressed size, not the original
993 // size.
994 if (result > 0) {
995 result = in_flight_write_.buffer()->size();
996 DCHECK_GE(result,
997 static_cast<int>(
998 buffered_spdy_framer_->GetControlFrameHeaderSize()));
999 result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
1000 }
1001 970
1002 // It is possible that the stream was cancelled while we were writing 971 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
1003 // to the socket.
1004 if (!stream->cancelled())
1005 stream->OnWriteComplete(result);
1006 }
1007 972
1008 // Cleanup the write which just completed. 973 // It is possible that the stream was cancelled while we were writing
1009 in_flight_write_.release(); 974 // to the socket.
975 if (stream && !stream->cancelled()) {
976 // Report the number of bytes written to the caller, but exclude the
977 // frame size overhead. NOTE: if this frame was compressed the
978 // reported bytes written is the compressed size, not the original
979 // size.
980 result = in_flight_write_.buffer()->size();
981 DCHECK_GE(result,
982 static_cast<int>(
983 buffered_spdy_framer_->GetControlFrameHeaderSize()));
984 result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
985
986 stream->OnWriteComplete(result);
1010 } 987 }
1011 988
1012 // Write more data. We're already in a continuation, so we can 989 // Cleanup the write which just completed.
1013 // go ahead and write it immediately (without going back to the 990 in_flight_write_.Release();
1014 // message loop). 991 }
1015 WriteSocketLater();
1016 } else {
1017 in_flight_write_.release();
1018 992
1019 // The stream is now errored. Close it down. 993 // Write more data. We're already in a continuation, so we can go
1020 CloseSessionOnError( 994 // ahead and write it immediately (without going back to the message
1021 static_cast<net::Error>(result), true, "The stream has errored."); 995 // loop).
1022 } 996 WriteSocketLater();
1023 } 997 }
1024 998
1025 void SpdySession::WriteSocketLater() { 999 void SpdySession::WriteSocketLater() {
1026 if (delayed_write_pending_) 1000 if (delayed_write_pending_)
1027 return; 1001 return;
1028 1002
1029 if (!IsConnected()) 1003 if (!IsConnected())
1030 return; 1004 return;
1031 1005
1032 delayed_write_pending_ = true; 1006 delayed_write_pending_ = true;
(...skipping 12 matching lines...) Expand all
1045 // closed, just return. 1019 // closed, just return.
1046 if (!IsConnected()) 1020 if (!IsConnected())
1047 return; 1021 return;
1048 1022
1049 if (write_pending_) // Another write is in progress still. 1023 if (write_pending_) // Another write is in progress still.
1050 return; 1024 return;
1051 1025
1052 // Loop sending frames until we've sent everything or until the write 1026 // Loop sending frames until we've sent everything or until the write
1053 // returns error (or ERR_IO_PENDING). 1027 // returns error (or ERR_IO_PENDING).
1054 DCHECK(buffered_spdy_framer_.get()); 1028 DCHECK(buffered_spdy_framer_.get());
1055 while (in_flight_write_.buffer() || !write_queue_.empty()) { 1029 while (true) {
1056 if (!in_flight_write_.buffer()) { 1030 if (in_flight_write_.buffer()) {
1057 // Grab the next SpdyBuffer to send. 1031 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
1058 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); 1032 } else {
1059 write_queue_.pop(); 1033 // Grab the next frame to send.
1060 scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); 1034 scoped_ptr<SpdyFrameProducer> producer = PopNextFrameProducerToWrite();
1061 stream_producers_.erase(producer.get()); 1035 if (!producer)
1036 break;
1037
1038 scoped_refptr<SpdyStream> stream = producer->GetStream();
1062 // It is possible that a stream had data to write, but a 1039 // It is possible that a stream had data to write, but a
1063 // WINDOW_UPDATE frame has been received which made that 1040 // WINDOW_UPDATE frame has been received which made that
1064 // stream no longer writable. 1041 // stream no longer writable.
1065 // TODO(rch): consider handling that case by removing the 1042 // TODO(rch): consider handling that case by removing the
1066 // stream from the writable queue? 1043 // stream from the writable queue?
1067 if (buffer == NULL) 1044 if (stream.get() && stream->cancelled())
1068 continue; 1045 continue;
1069 1046
1070 in_flight_write_ = *buffer; 1047 if (stream.get() && stream->stream_id() == 0)
1071 } else { 1048 ActivateStream(stream);
1072 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 1049
1050 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
1051 DCHECK(frame);
1052 DCHECK_GT(frame->size(), 0u);
1053
1054 // TODO(mbelshe): We have too much copying of data here.
1055 scoped_refptr<IOBufferWithSize> buffer =
1056 new IOBufferWithSize(frame->size());
1057 memcpy(buffer->data(), frame->data(), frame->size());
1058 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
1073 } 1059 }
1074 1060
1075 write_pending_ = true; 1061 write_pending_ = true;
1076 int rv = connection_->socket()->Write( 1062 int rv = connection_->socket()->Write(
1077 in_flight_write_.buffer(), 1063 in_flight_write_.buffer(),
1078 in_flight_write_.buffer()->BytesRemaining(), 1064 in_flight_write_.buffer()->BytesRemaining(),
1079 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); 1065 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
1080 if (rv == net::ERR_IO_PENDING) 1066 if (rv == net::ERR_IO_PENDING)
1081 break; 1067 break;
1082 1068
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
1120 } 1106 }
1121 1107
1122 while (!created_streams_.empty()) { 1108 while (!created_streams_.empty()) {
1123 CreatedStreamSet::iterator it = created_streams_.begin(); 1109 CreatedStreamSet::iterator it = created_streams_.begin();
1124 const scoped_refptr<SpdyStream> stream = *it; 1110 const scoped_refptr<SpdyStream> stream = *it;
1125 created_streams_.erase(it); 1111 created_streams_.erase(it);
1126 LogAbandonedStream(stream, status); 1112 LogAbandonedStream(stream, status);
1127 stream->OnClose(status); 1113 stream->OnClose(status);
1128 } 1114 }
1129 1115
1130 // We also need to drain the queue. 1116 ClearWriteQueue();
1131 while (!write_queue_.empty()) {
1132 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
1133 write_queue_.pop();
1134 stream_producers_.erase(producer.get());
1135 }
1136 } 1117 }
1137 1118
1138 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, 1119 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
1139 net::Error status) { 1120 net::Error status) {
1140 DCHECK(stream); 1121 DCHECK(stream);
1141 std::string description = base::StringPrintf( 1122 std::string description = base::StringPrintf(
1142 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); 1123 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
1143 stream->LogStreamError(status, description); 1124 stream->LogStreamError(status, description);
1144 } 1125 }
1145 1126
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
1248 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1229 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1249 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected", 1230 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected",
1250 !connection_->socket()); 1231 !connection_->socket());
1251 if (!connection_->socket()) { 1232 if (!connection_->socket()) {
1252 return ERR_SOCKET_NOT_CONNECTED; 1233 return ERR_SOCKET_NOT_CONNECTED;
1253 } 1234 }
1254 1235
1255 return connection_->socket()->GetLocalAddress(address); 1236 return connection_->socket()->GetLocalAddress(address);
1256 } 1237 }
1257 1238
1258 class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { 1239 // A simple wrapper around a SpdyFrame associated with the session.
1240 class SessionFrameProducer : public SpdyFrameProducer {
1259 public: 1241 public:
1260 SimpleSpdyIOBufferProducer(SpdyFrame* frame, 1242 SessionFrameProducer(scoped_ptr<SpdyFrame> frame) : frame_(frame.Pass()) {}
1261 RequestPriority priority) 1243
1262 : frame_(frame), 1244 virtual ~SessionFrameProducer() {}
1263 priority_(priority) { 1245
1246 virtual SpdyStream* GetStream() OVERRIDE {
1247 return NULL;
1264 } 1248 }
1265 1249
1266 virtual RequestPriority GetPriority() const OVERRIDE { 1250 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
1267 return priority_; 1251 DCHECK(frame_);
1268 } 1252 return frame_.Pass();
1269
1270 virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
1271 return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
1272 frame_.get(), priority_, NULL);
1273 } 1253 }
1274 1254
1275 private: 1255 private:
1276 scoped_ptr<SpdyFrame> frame_; 1256 scoped_ptr<SpdyFrame> frame_;
1277 RequestPriority priority_;
1278 }; 1257 };
1279 1258
1280 void SpdySession::QueueFrame(SpdyFrame* frame, 1259 void SpdySession::QueueSessionFrameForWriting(scoped_ptr<SpdyFrame> frame,
1281 RequestPriority priority) { 1260 RequestPriority priority) {
1282 SimpleSpdyIOBufferProducer* producer = 1261 QueueFrameProducerForWriting(
1283 new SimpleSpdyIOBufferProducer(frame, priority); 1262 scoped_ptr<SpdyFrameProducer>(new SessionFrameProducer(frame.Pass())),
1284 write_queue_.push(producer); 1263 priority);
1264 }
1265
1266 void SpdySession::QueueFrameProducerForWriting(
1267 scoped_ptr<SpdyFrameProducer> producer,
1268 RequestPriority priority) {
1269 write_queue_[priority].push_back(producer.release());
1285 WriteSocketLater(); 1270 WriteSocketLater();
1286 } 1271 }
1287 1272
1273 scoped_ptr<SpdyFrameProducer> SpdySession::PopNextFrameProducerToWrite() {
1274 for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
1275 std::deque<SpdyFrameProducer*>* queue = &write_queue_[i];
1276 if (!queue->empty()) {
1277 scoped_ptr<SpdyFrameProducer> producer(queue->front());
1278 queue->pop_front();
1279 return producer.Pass();
1280 }
1281 }
1282 return scoped_ptr<SpdyFrameProducer>();
1283 }
1284
1285 void SpdySession::RemoveStreamFromWriteQueue(
1286 const scoped_refptr<SpdyStream>& stream) {
1287 std::deque<SpdyFrameProducer*> old;
1288 std::deque<SpdyFrameProducer*>* queue =
1289 &write_queue_[stream->priority()];
1290 old.swap(*queue);
1291
1292 while (!old.empty()) {
1293 scoped_ptr<SpdyFrameProducer> producer(old.front());
1294 old.pop_front();
1295 scoped_refptr<SpdyStream> producer_stream = producer->GetStream();
1296 if (!producer_stream || producer_stream != stream) {
1297 queue->push_back(producer.release());
1298 }
1299 }
1300 }
1301
1302 void SpdySession::ClearWriteQueue() {
1303 for (int i = 0; i < NUM_PRIORITIES; ++i) {
1304 STLDeleteElements(&write_queue_[i]);
1305 }
1306 }
1307
1288 void SpdySession::ActivateStream(SpdyStream* stream) { 1308 void SpdySession::ActivateStream(SpdyStream* stream) {
1289 if (stream->stream_id() == 0) { 1309 if (stream->stream_id() == 0) {
1290 stream->set_stream_id(GetNewStreamId()); 1310 stream->set_stream_id(GetNewStreamId());
1291 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); 1311 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1292 } 1312 }
1293 const SpdyStreamId id = stream->stream_id(); 1313 const SpdyStreamId id = stream->stream_id();
1294 DCHECK(!IsStreamActive(id)); 1314 DCHECK(!IsStreamActive(id));
1295 1315
1296 active_streams_[id] = stream; 1316 active_streams_[id] = stream;
1297 } 1317 }
1298 1318
1299 void SpdySession::DeleteStream(SpdyStreamId id, int status) { 1319 void SpdySession::DeleteStream(SpdyStreamId id, int status) {
1300 // For push streams, if they are being deleted normally, we leave 1320 // For push streams, if they are being deleted normally, we leave
1301 // the stream in the unclaimed_pushed_streams_ list. However, if 1321 // the stream in the unclaimed_pushed_streams_ list. However, if
1302 // the stream is errored out, clean it up entirely. 1322 // the stream is errored out, clean it up entirely.
1303 if (status != OK) { 1323 if (status != OK) {
1304 PushedStreamMap::iterator it; 1324 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
1305 for (it = unclaimed_pushed_streams_.begin();
1306 it != unclaimed_pushed_streams_.end(); ++it) { 1325 it != unclaimed_pushed_streams_.end(); ++it) {
1307 scoped_refptr<SpdyStream> curr = it->second.first; 1326 scoped_refptr<SpdyStream> curr = it->second.first;
1308 if (id == curr->stream_id()) { 1327 if (id == curr->stream_id()) {
1309 unclaimed_pushed_streams_.erase(it); 1328 unclaimed_pushed_streams_.erase(it);
1310 break; 1329 break;
1311 } 1330 }
1312 } 1331 }
1313 } 1332 }
1314 1333
1315 // The stream might have been deleted. 1334 // The stream might have been deleted.
1316 ActiveStreamMap::iterator it2 = active_streams_.find(id); 1335 ActiveStreamMap::iterator it = active_streams_.find(id);
1317 if (it2 == active_streams_.end()) 1336 if (it == active_streams_.end())
1318 return; 1337 return;
1319 1338
1320 // Possibly remove from the write queue. 1339 const scoped_refptr<SpdyStream> stream(it->second);
1321 WriteQueue old = write_queue_; 1340 active_streams_.erase(it);
1322 write_queue_ = WriteQueue(); 1341 DCHECK(stream);
1323 while (!old.empty()) { 1342
1324 scoped_ptr<SpdyIOBufferProducer> producer(old.top()); 1343 RemoveStreamFromWriteQueue(stream);
1325 old.pop();
1326 StreamProducerMap::iterator it = stream_producers_.find(producer.get());
1327 if (it == stream_producers_.end() || it->second->stream_id() != id) {
1328 write_queue_.push(producer.release());
1329 } else {
1330 stream_producers_.erase(producer.get());
1331 producer.reset(NULL);
1332 }
1333 }
1334 1344
1335 // If this is an active stream, call the callback. 1345 // If this is an active stream, call the callback.
1336 const scoped_refptr<SpdyStream> stream(it2->second);
1337 active_streams_.erase(it2);
1338 DCHECK(stream);
1339 stream->OnClose(status); 1346 stream->OnClose(status);
1340 ProcessPendingStreamRequests(); 1347 ProcessPendingStreamRequests();
1341 } 1348 }
1342 1349
1343 void SpdySession::RemoveFromPool() { 1350 void SpdySession::RemoveFromPool() {
1344 if (spdy_session_pool_) { 1351 if (spdy_session_pool_) {
1345 SpdySessionPool* pool = spdy_session_pool_; 1352 SpdySessionPool* pool = spdy_session_pool_;
1346 spdy_session_pool_ = NULL; 1353 spdy_session_pool_ = NULL;
1347 pool->Remove(make_scoped_refptr(this)); 1354 pool->Remove(make_scoped_refptr(this));
1348 } 1355 }
(...skipping 563 matching lines...) Expand 10 before | Expand all | Expand 10 after
1912 void SpdySession::SendSettings(const SettingsMap& settings) { 1919 void SpdySession::SendSettings(const SettingsMap& settings) {
1913 net_log_.AddEvent( 1920 net_log_.AddEvent(
1914 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 1921 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1915 base::Bind(&NetLogSpdySettingsCallback, &settings)); 1922 base::Bind(&NetLogSpdySettingsCallback, &settings));
1916 1923
1917 // Create the SETTINGS frame and send it. 1924 // Create the SETTINGS frame and send it.
1918 DCHECK(buffered_spdy_framer_.get()); 1925 DCHECK(buffered_spdy_framer_.get());
1919 scoped_ptr<SpdyFrame> settings_frame( 1926 scoped_ptr<SpdyFrame> settings_frame(
1920 buffered_spdy_framer_->CreateSettings(settings)); 1927 buffered_spdy_framer_->CreateSettings(settings));
1921 sent_settings_ = true; 1928 sent_settings_ = true;
1922 QueueFrame(settings_frame.release(), HIGHEST); 1929 QueueSessionFrameForWriting(settings_frame.Pass(), HIGHEST);
1923 } 1930 }
1924 1931
1925 void SpdySession::HandleSetting(uint32 id, uint32 value) { 1932 void SpdySession::HandleSetting(uint32 id, uint32 value) {
1926 switch (id) { 1933 switch (id) {
1927 case SETTINGS_MAX_CONCURRENT_STREAMS: 1934 case SETTINGS_MAX_CONCURRENT_STREAMS:
1928 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 1935 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
1929 kMaxConcurrentStreamLimit); 1936 kMaxConcurrentStreamLimit);
1930 ProcessPendingStreamRequests(); 1937 ProcessPendingStreamRequests();
1931 break; 1938 break;
1932 case SETTINGS_INITIAL_WINDOW_SIZE: { 1939 case SETTINGS_INITIAL_WINDOW_SIZE: {
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
2000 } 2007 }
2001 2008
2002 net_log_.AddEvent( 2009 net_log_.AddEvent(
2003 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2010 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2004 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2011 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2005 stream_id, delta_window_size)); 2012 stream_id, delta_window_size));
2006 2013
2007 DCHECK(buffered_spdy_framer_.get()); 2014 DCHECK(buffered_spdy_framer_.get());
2008 scoped_ptr<SpdyFrame> window_update_frame( 2015 scoped_ptr<SpdyFrame> window_update_frame(
2009 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2016 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2010 QueueFrame(window_update_frame.release(), priority); 2017 QueueSessionFrameForWriting(window_update_frame.Pass(), priority);
2011 } 2018 }
2012 2019
2013 void SpdySession::WritePingFrame(uint32 unique_id) { 2020 void SpdySession::WritePingFrame(uint32 unique_id) {
2014 DCHECK(buffered_spdy_framer_.get()); 2021 DCHECK(buffered_spdy_framer_.get());
2015 scoped_ptr<SpdyFrame> ping_frame( 2022 scoped_ptr<SpdyFrame> ping_frame(
2016 buffered_spdy_framer_->CreatePingFrame(unique_id)); 2023 buffered_spdy_framer_->CreatePingFrame(unique_id));
2017 QueueFrame(ping_frame.release(), HIGHEST); 2024 QueueSessionFrameForWriting(ping_frame.Pass(), HIGHEST);
2018 2025
2019 if (net_log().IsLoggingAllEvents()) { 2026 if (net_log().IsLoggingAllEvents()) {
2020 net_log().AddEvent( 2027 net_log().AddEvent(
2021 NetLog::TYPE_SPDY_SESSION_PING, 2028 NetLog::TYPE_SPDY_SESSION_PING,
2022 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); 2029 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
2023 } 2030 }
2024 if (unique_id % 2 != 0) { 2031 if (unique_id % 2 != 0) {
2025 next_ping_id_ += 2; 2032 next_ping_id_ += 2;
2026 ++pings_in_flight_; 2033 ++pings_in_flight_;
2027 PlanToCheckPingStatus(); 2034 PlanToCheckPingStatus();
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after
2302 } 2309 }
2303 2310
2304 session_recv_window_size_ -= delta_window_size; 2311 session_recv_window_size_ -= delta_window_size;
2305 net_log_.AddEvent( 2312 net_log_.AddEvent(
2306 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2313 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2307 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2314 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2308 -delta_window_size, session_recv_window_size_)); 2315 -delta_window_size, session_recv_window_size_));
2309 } 2316 }
2310 2317
2311 } // namespace net 2318 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698