OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/spdy/spdy_session.h" | 5 #include "net/spdy/spdy_session.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <map> | 8 #include <map> |
9 | 9 |
10 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
(...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
264 | 264 |
265 void SpdyStreamRequest::Reset() { | 265 void SpdyStreamRequest::Reset() { |
266 session_ = NULL; | 266 session_ = NULL; |
267 stream_ = NULL; | 267 stream_ = NULL; |
268 url_ = GURL(); | 268 url_ = GURL(); |
269 priority_ = MINIMUM_PRIORITY; | 269 priority_ = MINIMUM_PRIORITY; |
270 net_log_ = BoundNetLog(); | 270 net_log_ = BoundNetLog(); |
271 callback_.Reset(); | 271 callback_.Reset(); |
272 } | 272 } |
273 | 273 |
274 // static | |
275 void SpdySession::SpdyIOBufferProducer::ActivateStream( | |
276 SpdySession* spdy_session, | |
277 SpdyStream* spdy_stream) { | |
278 spdy_session->ActivateStream(spdy_stream); | |
279 } | |
280 | |
281 // static | |
282 SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( | |
283 SpdyFrame* frame, | |
284 RequestPriority priority, | |
285 SpdyStream* stream) { | |
286 size_t size = frame->size(); | |
287 DCHECK_GT(size, 0u); | |
288 | |
289 // TODO(mbelshe): We have too much copying of data here. | |
290 IOBufferWithSize* buffer = new IOBufferWithSize(size); | |
291 memcpy(buffer->data(), frame->data(), size); | |
292 | |
293 return new SpdyIOBuffer(buffer, size, priority, stream); | |
294 } | |
295 | |
296 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, | 274 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, |
297 SpdySessionPool* spdy_session_pool, | 275 SpdySessionPool* spdy_session_pool, |
298 HttpServerProperties* http_server_properties, | 276 HttpServerProperties* http_server_properties, |
299 bool verify_domain_authentication, | 277 bool verify_domain_authentication, |
300 bool enable_sending_initial_settings, | 278 bool enable_sending_initial_settings, |
301 bool enable_credential_frames, | 279 bool enable_credential_frames, |
302 bool enable_compression, | 280 bool enable_compression, |
303 bool enable_ping_based_connection_checking, | 281 bool enable_ping_based_connection_checking, |
304 NextProto default_protocol, | 282 NextProto default_protocol, |
305 size_t stream_initial_recv_window_size, | 283 size_t stream_initial_recv_window_size, |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
382 | 360 |
383 if (connection_->is_initialized()) { | 361 if (connection_->is_initialized()) { |
384 // With SPDY we can't recycle sockets. | 362 // With SPDY we can't recycle sockets. |
385 connection_->socket()->Disconnect(); | 363 connection_->socket()->Disconnect(); |
386 } | 364 } |
387 | 365 |
388 // Streams should all be gone now. | 366 // Streams should all be gone now. |
389 DCHECK_EQ(0u, num_active_streams()); | 367 DCHECK_EQ(0u, num_active_streams()); |
390 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); | 368 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); |
391 | 369 |
392 for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { | 370 for (int i = 0; i < NUM_PRIORITIES; ++i) { |
393 DCHECK(pending_create_stream_queues_[i].empty()); | 371 DCHECK(pending_create_stream_queues_[i].empty()); |
394 } | 372 } |
395 DCHECK(pending_stream_request_completions_.empty()); | 373 DCHECK(pending_stream_request_completions_.empty()); |
396 | 374 |
397 RecordHistograms(); | 375 RecordHistograms(); |
398 | 376 |
399 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); | 377 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); |
400 } | 378 } |
401 | 379 |
402 net::Error SpdySession::InitializeWithSocket( | 380 net::Error SpdySession::InitializeWithSocket( |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
478 return true; // This is not a secure session, so all domains are okay. | 456 return true; // This is not a secure session, so all domains are okay. |
479 | 457 |
480 return !ssl_info.client_cert_sent && | 458 return !ssl_info.client_cert_sent && |
481 (enable_credential_frames_ || !ssl_info.channel_id_sent || | 459 (enable_credential_frames_ || !ssl_info.channel_id_sent || |
482 ServerBoundCertService::GetDomainForHost(domain) == | 460 ServerBoundCertService::GetDomainForHost(domain) == |
483 ServerBoundCertService::GetDomainForHost( | 461 ServerBoundCertService::GetDomainForHost( |
484 host_port_proxy_pair_.first.host())) && | 462 host_port_proxy_pair_.first.host())) && |
485 ssl_info.cert->VerifyNameMatch(domain); | 463 ssl_info.cert->VerifyNameMatch(domain); |
486 } | 464 } |
487 | 465 |
488 void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, | 466 void SpdySession::SetStreamHasWriteAvailable( |
489 SpdyIOBufferProducer* producer) { | 467 SpdyStream* stream, |
490 write_queue_.push(producer); | 468 scoped_ptr<SpdyFrameProducer> producer) { |
491 stream_producers_[producer] = stream; | 469 QueueFrameProducerForWriting(producer.Pass(), stream->priority()); |
492 WriteSocketLater(); | |
493 } | 470 } |
494 | 471 |
495 int SpdySession::GetPushStream( | 472 int SpdySession::GetPushStream( |
496 const GURL& url, | 473 const GURL& url, |
497 scoped_refptr<SpdyStream>* stream, | 474 scoped_refptr<SpdyStream>* stream, |
498 const BoundNetLog& stream_net_log) { | 475 const BoundNetLog& stream_net_log) { |
499 CHECK_NE(state_, STATE_CLOSED); | 476 CHECK_NE(state_, STATE_CLOSED); |
500 | 477 |
501 *stream = NULL; | 478 *stream = NULL; |
502 | 479 |
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
842 DCHECK(buffered_spdy_framer_.get()); | 819 DCHECK(buffered_spdy_framer_.get()); |
843 scoped_ptr<SpdyFrame> rst_frame( | 820 scoped_ptr<SpdyFrame> rst_frame( |
844 buffered_spdy_framer_->CreateRstStream(stream_id, status)); | 821 buffered_spdy_framer_->CreateRstStream(stream_id, status)); |
845 | 822 |
846 // Default to lowest priority unless we know otherwise. | 823 // Default to lowest priority unless we know otherwise. |
847 RequestPriority priority = net::IDLE; | 824 RequestPriority priority = net::IDLE; |
848 if (IsStreamActive(stream_id)) { | 825 if (IsStreamActive(stream_id)) { |
849 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; | 826 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
850 priority = stream->priority(); | 827 priority = stream->priority(); |
851 } | 828 } |
852 QueueFrame(rst_frame.release(), priority); | 829 QueueSessionFrameForWriting(rst_frame.Pass(), priority); |
853 RecordProtocolErrorHistogram( | 830 RecordProtocolErrorHistogram( |
854 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); | 831 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
855 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); | 832 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
856 } | 833 } |
857 | 834 |
858 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { | 835 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { |
859 return ContainsKey(active_streams_, stream_id); | 836 return ContainsKey(active_streams_, stream_id); |
860 } | 837 } |
861 | 838 |
862 LoadState SpdySession::GetLoadState() const { | 839 LoadState SpdySession::GetLoadState() const { |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
963 data += bytes_processed; | 940 data += bytes_processed; |
964 } | 941 } |
965 | 942 |
966 if (IsConnected()) | 943 if (IsConnected()) |
967 state_ = STATE_DO_READ; | 944 state_ = STATE_DO_READ; |
968 return OK; | 945 return OK; |
969 } | 946 } |
970 | 947 |
971 void SpdySession::OnWriteComplete(int result) { | 948 void SpdySession::OnWriteComplete(int result) { |
972 DCHECK(write_pending_); | 949 DCHECK(write_pending_); |
973 DCHECK(in_flight_write_.size()); | 950 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
974 | 951 |
975 last_activity_time_ = base::TimeTicks::Now(); | 952 last_activity_time_ = base::TimeTicks::Now(); |
976 write_pending_ = false; | 953 write_pending_ = false; |
977 | 954 |
978 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); | 955 if (result < 0) { |
| 956 in_flight_write_.Release(); |
| 957 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
| 958 return; |
| 959 } |
979 | 960 |
980 if (result >= 0) { | 961 // It should not be possible to have written more bytes than our |
981 // It should not be possible to have written more bytes than our | 962 // in_flight_write_. |
982 // in_flight_write_. | 963 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
983 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); | |
984 | 964 |
985 in_flight_write_.buffer()->DidConsume(result); | 965 in_flight_write_.buffer()->DidConsume(result); |
986 | 966 |
987 // We only notify the stream when we've fully written the pending frame. | 967 // We only notify the stream when we've fully written the pending frame. |
988 if (!in_flight_write_.buffer()->BytesRemaining()) { | 968 if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
989 if (stream) { | 969 DCHECK_GT(result, 0); |
990 // Report the number of bytes written to the caller, but exclude the | |
991 // frame size overhead. NOTE: if this frame was compressed the | |
992 // reported bytes written is the compressed size, not the original | |
993 // size. | |
994 if (result > 0) { | |
995 result = in_flight_write_.buffer()->size(); | |
996 DCHECK_GE(result, | |
997 static_cast<int>( | |
998 buffered_spdy_framer_->GetControlFrameHeaderSize())); | |
999 result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); | |
1000 } | |
1001 | 970 |
1002 // It is possible that the stream was cancelled while we were writing | 971 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
1003 // to the socket. | |
1004 if (!stream->cancelled()) | |
1005 stream->OnWriteComplete(result); | |
1006 } | |
1007 | 972 |
1008 // Cleanup the write which just completed. | 973 // It is possible that the stream was cancelled while we were writing |
1009 in_flight_write_.release(); | 974 // to the socket. |
| 975 if (stream && !stream->cancelled()) { |
| 976 // Report the number of bytes written to the caller, but exclude the |
| 977 // frame size overhead. NOTE: if this frame was compressed the |
| 978 // reported bytes written is the compressed size, not the original |
| 979 // size. |
| 980 result = in_flight_write_.buffer()->size(); |
| 981 DCHECK_GE(result, |
| 982 static_cast<int>( |
| 983 buffered_spdy_framer_->GetControlFrameHeaderSize())); |
| 984 result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
| 985 |
| 986 stream->OnWriteComplete(result); |
1010 } | 987 } |
1011 | 988 |
1012 // Write more data. We're already in a continuation, so we can | 989 // Cleanup the write which just completed. |
1013 // go ahead and write it immediately (without going back to the | 990 in_flight_write_.Release(); |
1014 // message loop). | 991 } |
1015 WriteSocketLater(); | |
1016 } else { | |
1017 in_flight_write_.release(); | |
1018 | 992 |
1019 // The stream is now errored. Close it down. | 993 // Write more data. We're already in a continuation, so we can go |
1020 CloseSessionOnError( | 994 // ahead and write it immediately (without going back to the message |
1021 static_cast<net::Error>(result), true, "The stream has errored."); | 995 // loop). |
1022 } | 996 WriteSocketLater(); |
1023 } | 997 } |
1024 | 998 |
1025 void SpdySession::WriteSocketLater() { | 999 void SpdySession::WriteSocketLater() { |
1026 if (delayed_write_pending_) | 1000 if (delayed_write_pending_) |
1027 return; | 1001 return; |
1028 | 1002 |
1029 if (!IsConnected()) | 1003 if (!IsConnected()) |
1030 return; | 1004 return; |
1031 | 1005 |
1032 delayed_write_pending_ = true; | 1006 delayed_write_pending_ = true; |
(...skipping 12 matching lines...) Expand all Loading... |
1045 // closed, just return. | 1019 // closed, just return. |
1046 if (!IsConnected()) | 1020 if (!IsConnected()) |
1047 return; | 1021 return; |
1048 | 1022 |
1049 if (write_pending_) // Another write is in progress still. | 1023 if (write_pending_) // Another write is in progress still. |
1050 return; | 1024 return; |
1051 | 1025 |
1052 // Loop sending frames until we've sent everything or until the write | 1026 // Loop sending frames until we've sent everything or until the write |
1053 // returns error (or ERR_IO_PENDING). | 1027 // returns error (or ERR_IO_PENDING). |
1054 DCHECK(buffered_spdy_framer_.get()); | 1028 DCHECK(buffered_spdy_framer_.get()); |
1055 while (in_flight_write_.buffer() || !write_queue_.empty()) { | 1029 while (true) { |
1056 if (!in_flight_write_.buffer()) { | 1030 if (in_flight_write_.buffer()) { |
1057 // Grab the next SpdyBuffer to send. | 1031 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
1058 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); | 1032 } else { |
1059 write_queue_.pop(); | 1033 // Grab the next frame to send. |
1060 scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); | 1034 scoped_ptr<SpdyFrameProducer> producer = PopNextFrameProducerToWrite(); |
1061 stream_producers_.erase(producer.get()); | 1035 if (!producer) |
| 1036 break; |
| 1037 |
| 1038 scoped_refptr<SpdyStream> stream = producer->GetStream(); |
1062 // It is possible that a stream had data to write, but a | 1039 // It is possible that a stream had data to write, but a |
1063 // WINDOW_UPDATE frame has been received which made that | 1040 // WINDOW_UPDATE frame has been received which made that |
1064 // stream no longer writable. | 1041 // stream no longer writable. |
1065 // TODO(rch): consider handling that case by removing the | 1042 // TODO(rch): consider handling that case by removing the |
1066 // stream from the writable queue? | 1043 // stream from the writable queue? |
1067 if (buffer == NULL) | 1044 if (stream.get() && stream->cancelled()) |
1068 continue; | 1045 continue; |
1069 | 1046 |
1070 in_flight_write_ = *buffer; | 1047 if (stream.get() && stream->stream_id() == 0) |
1071 } else { | 1048 ActivateStream(stream); |
1072 DCHECK(in_flight_write_.buffer()->BytesRemaining()); | 1049 |
| 1050 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
| 1051 DCHECK(frame); |
| 1052 DCHECK_GT(frame->size(), 0u); |
| 1053 |
| 1054 // TODO(mbelshe): We have too much copying of data here. |
| 1055 scoped_refptr<IOBufferWithSize> buffer = |
| 1056 new IOBufferWithSize(frame->size()); |
| 1057 memcpy(buffer->data(), frame->data(), frame->size()); |
| 1058 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
1073 } | 1059 } |
1074 | 1060 |
1075 write_pending_ = true; | 1061 write_pending_ = true; |
1076 int rv = connection_->socket()->Write( | 1062 int rv = connection_->socket()->Write( |
1077 in_flight_write_.buffer(), | 1063 in_flight_write_.buffer(), |
1078 in_flight_write_.buffer()->BytesRemaining(), | 1064 in_flight_write_.buffer()->BytesRemaining(), |
1079 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); | 1065 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); |
1080 if (rv == net::ERR_IO_PENDING) | 1066 if (rv == net::ERR_IO_PENDING) |
1081 break; | 1067 break; |
1082 | 1068 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1120 } | 1106 } |
1121 | 1107 |
1122 while (!created_streams_.empty()) { | 1108 while (!created_streams_.empty()) { |
1123 CreatedStreamSet::iterator it = created_streams_.begin(); | 1109 CreatedStreamSet::iterator it = created_streams_.begin(); |
1124 const scoped_refptr<SpdyStream> stream = *it; | 1110 const scoped_refptr<SpdyStream> stream = *it; |
1125 created_streams_.erase(it); | 1111 created_streams_.erase(it); |
1126 LogAbandonedStream(stream, status); | 1112 LogAbandonedStream(stream, status); |
1127 stream->OnClose(status); | 1113 stream->OnClose(status); |
1128 } | 1114 } |
1129 | 1115 |
1130 // We also need to drain the queue. | 1116 ClearWriteQueue(); |
1131 while (!write_queue_.empty()) { | |
1132 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); | |
1133 write_queue_.pop(); | |
1134 stream_producers_.erase(producer.get()); | |
1135 } | |
1136 } | 1117 } |
1137 | 1118 |
1138 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, | 1119 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, |
1139 net::Error status) { | 1120 net::Error status) { |
1140 DCHECK(stream); | 1121 DCHECK(stream); |
1141 std::string description = base::StringPrintf( | 1122 std::string description = base::StringPrintf( |
1142 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); | 1123 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); |
1143 stream->LogStreamError(status, description); | 1124 stream->LogStreamError(status, description); |
1144 } | 1125 } |
1145 | 1126 |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1248 int SpdySession::GetLocalAddress(IPEndPoint* address) const { | 1229 int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
1249 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected", | 1230 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected", |
1250 !connection_->socket()); | 1231 !connection_->socket()); |
1251 if (!connection_->socket()) { | 1232 if (!connection_->socket()) { |
1252 return ERR_SOCKET_NOT_CONNECTED; | 1233 return ERR_SOCKET_NOT_CONNECTED; |
1253 } | 1234 } |
1254 | 1235 |
1255 return connection_->socket()->GetLocalAddress(address); | 1236 return connection_->socket()->GetLocalAddress(address); |
1256 } | 1237 } |
1257 | 1238 |
1258 class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { | 1239 // A simple wrapper around a SpdyFrame associated with the session. |
| 1240 class SessionFrameProducer : public SpdyFrameProducer { |
1259 public: | 1241 public: |
1260 SimpleSpdyIOBufferProducer(SpdyFrame* frame, | 1242 SessionFrameProducer(scoped_ptr<SpdyFrame> frame) : frame_(frame.Pass()) {} |
1261 RequestPriority priority) | 1243 |
1262 : frame_(frame), | 1244 virtual ~SessionFrameProducer() {} |
1263 priority_(priority) { | 1245 |
| 1246 virtual SpdyStream* GetStream() OVERRIDE { |
| 1247 return NULL; |
1264 } | 1248 } |
1265 | 1249 |
1266 virtual RequestPriority GetPriority() const OVERRIDE { | 1250 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
1267 return priority_; | 1251 DCHECK(frame_); |
1268 } | 1252 return frame_.Pass(); |
1269 | |
1270 virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { | |
1271 return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( | |
1272 frame_.get(), priority_, NULL); | |
1273 } | 1253 } |
1274 | 1254 |
1275 private: | 1255 private: |
1276 scoped_ptr<SpdyFrame> frame_; | 1256 scoped_ptr<SpdyFrame> frame_; |
1277 RequestPriority priority_; | |
1278 }; | 1257 }; |
1279 | 1258 |
1280 void SpdySession::QueueFrame(SpdyFrame* frame, | 1259 void SpdySession::QueueSessionFrameForWriting(scoped_ptr<SpdyFrame> frame, |
1281 RequestPriority priority) { | 1260 RequestPriority priority) { |
1282 SimpleSpdyIOBufferProducer* producer = | 1261 QueueFrameProducerForWriting( |
1283 new SimpleSpdyIOBufferProducer(frame, priority); | 1262 scoped_ptr<SpdyFrameProducer>(new SessionFrameProducer(frame.Pass())), |
1284 write_queue_.push(producer); | 1263 priority); |
| 1264 } |
| 1265 |
| 1266 void SpdySession::QueueFrameProducerForWriting( |
| 1267 scoped_ptr<SpdyFrameProducer> producer, |
| 1268 RequestPriority priority) { |
| 1269 write_queue_[priority].push_back(producer.release()); |
1285 WriteSocketLater(); | 1270 WriteSocketLater(); |
1286 } | 1271 } |
1287 | 1272 |
| 1273 scoped_ptr<SpdyFrameProducer> SpdySession::PopNextFrameProducerToWrite() { |
| 1274 for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { |
| 1275 std::deque<SpdyFrameProducer*>* queue = &write_queue_[i]; |
| 1276 if (!queue->empty()) { |
| 1277 scoped_ptr<SpdyFrameProducer> producer(queue->front()); |
| 1278 queue->pop_front(); |
| 1279 return producer.Pass(); |
| 1280 } |
| 1281 } |
| 1282 return scoped_ptr<SpdyFrameProducer>(); |
| 1283 } |
| 1284 |
| 1285 void SpdySession::RemoveStreamFromWriteQueue( |
| 1286 const scoped_refptr<SpdyStream>& stream) { |
| 1287 std::deque<SpdyFrameProducer*> old; |
| 1288 std::deque<SpdyFrameProducer*>* queue = |
| 1289 &write_queue_[stream->priority()]; |
| 1290 old.swap(*queue); |
| 1291 |
| 1292 while (!old.empty()) { |
| 1293 scoped_ptr<SpdyFrameProducer> producer(old.front()); |
| 1294 old.pop_front(); |
| 1295 scoped_refptr<SpdyStream> producer_stream = producer->GetStream(); |
| 1296 if (!producer_stream || producer_stream != stream) { |
| 1297 queue->push_back(producer.release()); |
| 1298 } |
| 1299 } |
| 1300 } |
| 1301 |
| 1302 void SpdySession::ClearWriteQueue() { |
| 1303 for (int i = 0; i < NUM_PRIORITIES; ++i) { |
| 1304 STLDeleteElements(&write_queue_[i]); |
| 1305 } |
| 1306 } |
| 1307 |
1288 void SpdySession::ActivateStream(SpdyStream* stream) { | 1308 void SpdySession::ActivateStream(SpdyStream* stream) { |
1289 if (stream->stream_id() == 0) { | 1309 if (stream->stream_id() == 0) { |
1290 stream->set_stream_id(GetNewStreamId()); | 1310 stream->set_stream_id(GetNewStreamId()); |
1291 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); | 1311 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
1292 } | 1312 } |
1293 const SpdyStreamId id = stream->stream_id(); | 1313 const SpdyStreamId id = stream->stream_id(); |
1294 DCHECK(!IsStreamActive(id)); | 1314 DCHECK(!IsStreamActive(id)); |
1295 | 1315 |
1296 active_streams_[id] = stream; | 1316 active_streams_[id] = stream; |
1297 } | 1317 } |
1298 | 1318 |
1299 void SpdySession::DeleteStream(SpdyStreamId id, int status) { | 1319 void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
1300 // For push streams, if they are being deleted normally, we leave | 1320 // For push streams, if they are being deleted normally, we leave |
1301 // the stream in the unclaimed_pushed_streams_ list. However, if | 1321 // the stream in the unclaimed_pushed_streams_ list. However, if |
1302 // the stream is errored out, clean it up entirely. | 1322 // the stream is errored out, clean it up entirely. |
1303 if (status != OK) { | 1323 if (status != OK) { |
1304 PushedStreamMap::iterator it; | 1324 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); |
1305 for (it = unclaimed_pushed_streams_.begin(); | |
1306 it != unclaimed_pushed_streams_.end(); ++it) { | 1325 it != unclaimed_pushed_streams_.end(); ++it) { |
1307 scoped_refptr<SpdyStream> curr = it->second.first; | 1326 scoped_refptr<SpdyStream> curr = it->second.first; |
1308 if (id == curr->stream_id()) { | 1327 if (id == curr->stream_id()) { |
1309 unclaimed_pushed_streams_.erase(it); | 1328 unclaimed_pushed_streams_.erase(it); |
1310 break; | 1329 break; |
1311 } | 1330 } |
1312 } | 1331 } |
1313 } | 1332 } |
1314 | 1333 |
1315 // The stream might have been deleted. | 1334 // The stream might have been deleted. |
1316 ActiveStreamMap::iterator it2 = active_streams_.find(id); | 1335 ActiveStreamMap::iterator it = active_streams_.find(id); |
1317 if (it2 == active_streams_.end()) | 1336 if (it == active_streams_.end()) |
1318 return; | 1337 return; |
1319 | 1338 |
1320 // Possibly remove from the write queue. | 1339 const scoped_refptr<SpdyStream> stream(it->second); |
1321 WriteQueue old = write_queue_; | 1340 active_streams_.erase(it); |
1322 write_queue_ = WriteQueue(); | 1341 DCHECK(stream); |
1323 while (!old.empty()) { | 1342 |
1324 scoped_ptr<SpdyIOBufferProducer> producer(old.top()); | 1343 RemoveStreamFromWriteQueue(stream); |
1325 old.pop(); | |
1326 StreamProducerMap::iterator it = stream_producers_.find(producer.get()); | |
1327 if (it == stream_producers_.end() || it->second->stream_id() != id) { | |
1328 write_queue_.push(producer.release()); | |
1329 } else { | |
1330 stream_producers_.erase(producer.get()); | |
1331 producer.reset(NULL); | |
1332 } | |
1333 } | |
1334 | 1344 |
1335 // If this is an active stream, call the callback. | 1345 // If this is an active stream, call the callback. |
1336 const scoped_refptr<SpdyStream> stream(it2->second); | |
1337 active_streams_.erase(it2); | |
1338 DCHECK(stream); | |
1339 stream->OnClose(status); | 1346 stream->OnClose(status); |
1340 ProcessPendingStreamRequests(); | 1347 ProcessPendingStreamRequests(); |
1341 } | 1348 } |
1342 | 1349 |
1343 void SpdySession::RemoveFromPool() { | 1350 void SpdySession::RemoveFromPool() { |
1344 if (spdy_session_pool_) { | 1351 if (spdy_session_pool_) { |
1345 SpdySessionPool* pool = spdy_session_pool_; | 1352 SpdySessionPool* pool = spdy_session_pool_; |
1346 spdy_session_pool_ = NULL; | 1353 spdy_session_pool_ = NULL; |
1347 pool->Remove(make_scoped_refptr(this)); | 1354 pool->Remove(make_scoped_refptr(this)); |
1348 } | 1355 } |
(...skipping 563 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1912 void SpdySession::SendSettings(const SettingsMap& settings) { | 1919 void SpdySession::SendSettings(const SettingsMap& settings) { |
1913 net_log_.AddEvent( | 1920 net_log_.AddEvent( |
1914 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, | 1921 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, |
1915 base::Bind(&NetLogSpdySettingsCallback, &settings)); | 1922 base::Bind(&NetLogSpdySettingsCallback, &settings)); |
1916 | 1923 |
1917 // Create the SETTINGS frame and send it. | 1924 // Create the SETTINGS frame and send it. |
1918 DCHECK(buffered_spdy_framer_.get()); | 1925 DCHECK(buffered_spdy_framer_.get()); |
1919 scoped_ptr<SpdyFrame> settings_frame( | 1926 scoped_ptr<SpdyFrame> settings_frame( |
1920 buffered_spdy_framer_->CreateSettings(settings)); | 1927 buffered_spdy_framer_->CreateSettings(settings)); |
1921 sent_settings_ = true; | 1928 sent_settings_ = true; |
1922 QueueFrame(settings_frame.release(), HIGHEST); | 1929 QueueSessionFrameForWriting(settings_frame.Pass(), HIGHEST); |
1923 } | 1930 } |
1924 | 1931 |
1925 void SpdySession::HandleSetting(uint32 id, uint32 value) { | 1932 void SpdySession::HandleSetting(uint32 id, uint32 value) { |
1926 switch (id) { | 1933 switch (id) { |
1927 case SETTINGS_MAX_CONCURRENT_STREAMS: | 1934 case SETTINGS_MAX_CONCURRENT_STREAMS: |
1928 max_concurrent_streams_ = std::min(static_cast<size_t>(value), | 1935 max_concurrent_streams_ = std::min(static_cast<size_t>(value), |
1929 kMaxConcurrentStreamLimit); | 1936 kMaxConcurrentStreamLimit); |
1930 ProcessPendingStreamRequests(); | 1937 ProcessPendingStreamRequests(); |
1931 break; | 1938 break; |
1932 case SETTINGS_INITIAL_WINDOW_SIZE: { | 1939 case SETTINGS_INITIAL_WINDOW_SIZE: { |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2000 } | 2007 } |
2001 | 2008 |
2002 net_log_.AddEvent( | 2009 net_log_.AddEvent( |
2003 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, | 2010 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, |
2004 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, | 2011 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, |
2005 stream_id, delta_window_size)); | 2012 stream_id, delta_window_size)); |
2006 | 2013 |
2007 DCHECK(buffered_spdy_framer_.get()); | 2014 DCHECK(buffered_spdy_framer_.get()); |
2008 scoped_ptr<SpdyFrame> window_update_frame( | 2015 scoped_ptr<SpdyFrame> window_update_frame( |
2009 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); | 2016 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
2010 QueueFrame(window_update_frame.release(), priority); | 2017 QueueSessionFrameForWriting(window_update_frame.Pass(), priority); |
2011 } | 2018 } |
2012 | 2019 |
2013 void SpdySession::WritePingFrame(uint32 unique_id) { | 2020 void SpdySession::WritePingFrame(uint32 unique_id) { |
2014 DCHECK(buffered_spdy_framer_.get()); | 2021 DCHECK(buffered_spdy_framer_.get()); |
2015 scoped_ptr<SpdyFrame> ping_frame( | 2022 scoped_ptr<SpdyFrame> ping_frame( |
2016 buffered_spdy_framer_->CreatePingFrame(unique_id)); | 2023 buffered_spdy_framer_->CreatePingFrame(unique_id)); |
2017 QueueFrame(ping_frame.release(), HIGHEST); | 2024 QueueSessionFrameForWriting(ping_frame.Pass(), HIGHEST); |
2018 | 2025 |
2019 if (net_log().IsLoggingAllEvents()) { | 2026 if (net_log().IsLoggingAllEvents()) { |
2020 net_log().AddEvent( | 2027 net_log().AddEvent( |
2021 NetLog::TYPE_SPDY_SESSION_PING, | 2028 NetLog::TYPE_SPDY_SESSION_PING, |
2022 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); | 2029 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); |
2023 } | 2030 } |
2024 if (unique_id % 2 != 0) { | 2031 if (unique_id % 2 != 0) { |
2025 next_ping_id_ += 2; | 2032 next_ping_id_ += 2; |
2026 ++pings_in_flight_; | 2033 ++pings_in_flight_; |
2027 PlanToCheckPingStatus(); | 2034 PlanToCheckPingStatus(); |
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2302 } | 2309 } |
2303 | 2310 |
2304 session_recv_window_size_ -= delta_window_size; | 2311 session_recv_window_size_ -= delta_window_size; |
2305 net_log_.AddEvent( | 2312 net_log_.AddEvent( |
2306 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, | 2313 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, |
2307 base::Bind(&NetLogSpdySessionWindowUpdateCallback, | 2314 base::Bind(&NetLogSpdySessionWindowUpdateCallback, |
2308 -delta_window_size, session_recv_window_size_)); | 2315 -delta_window_size, session_recv_window_size_)); |
2309 } | 2316 } |
2310 | 2317 |
2311 } // namespace net | 2318 } // namespace net |
OLD | NEW |