Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(375)

Side by Side Diff: trunk/src/net/spdy/spdy_session.cc

Issue 310563002: Revert 273680 "Defer SpdySession destruction to support closing ..." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « trunk/src/net/spdy/spdy_session.h ('k') | trunk/src/net/spdy/spdy_session_pool.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 509 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 net_log_.BeginEvent( 520 net_log_.BeginEvent(
521 NetLog::TYPE_SPDY_SESSION, 521 NetLog::TYPE_SPDY_SESSION,
522 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 522 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
523 next_unclaimed_push_stream_sweep_time_ = time_func_() + 523 next_unclaimed_push_stream_sweep_time_ = time_func_() +
524 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 524 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
525 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 525 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
526 } 526 }
527 527
528 SpdySession::~SpdySession() { 528 SpdySession::~SpdySession() {
529 CHECK(!in_io_loop_); 529 CHECK(!in_io_loop_);
530 DcheckDraining(); 530 DCHECK(!pool_);
531 DcheckClosed();
531 532
532 // TODO(akalin): Check connection->is_initialized() instead. This 533 // TODO(akalin): Check connection->is_initialized() instead. This
533 // requires re-working CreateFakeSpdySession(), though. 534 // requires re-working CreateFakeSpdySession(), though.
534 DCHECK(connection_->socket()); 535 DCHECK(connection_->socket());
535 // With SPDY we can't recycle sockets. 536 // With SPDY we can't recycle sockets.
536 connection_->socket()->Disconnect(); 537 connection_->socket()->Disconnect();
537 538
538 RecordHistograms(); 539 RecordHistograms();
539 540
540 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 541 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
594 #if defined(SPDY_PROXY_AUTH_ORIGIN) 595 #if defined(SPDY_PROXY_AUTH_ORIGIN)
595 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 596 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
596 host_port_pair().Equals(HostPortPair::FromURL( 597 host_port_pair().Equals(HostPortPair::FromURL(
597 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 598 GURL(SPDY_PROXY_AUTH_ORIGIN))));
598 #endif 599 #endif
599 600
600 net_log_.AddEvent( 601 net_log_.AddEvent(
601 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 602 NetLog::TYPE_SPDY_SESSION_INITIALIZED,
602 connection_->socket()->NetLog().source().ToEventParametersCallback()); 603 connection_->socket()->NetLog().source().ToEventParametersCallback());
603 604
604 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 605 DCHECK_NE(availability_state_, STATE_CLOSED);
605 connection_->AddHigherLayeredPool(this); 606 connection_->AddHigherLayeredPool(this);
606 if (enable_sending_initial_data_) 607 if (enable_sending_initial_data_)
607 SendInitialData(); 608 SendInitialData();
608 pool_ = pool; 609 pool_ = pool;
609 610
610 // Bootstrap the read loop. 611 // Bootstrap the read loop.
611 base::MessageLoop::current()->PostTask( 612 base::MessageLoop::current()->PostTask(
612 FROM_HERE, 613 FROM_HERE,
613 base::Bind(&SpdySession::PumpReadLoop, 614 base::Bind(&SpdySession::PumpReadLoop,
614 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 615 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
615 } 616 }
616 617
617 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 618 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
618 if (!verify_domain_authentication_) 619 if (!verify_domain_authentication_)
619 return true; 620 return true;
620 621
621 if (availability_state_ == STATE_DRAINING) 622 if (availability_state_ == STATE_CLOSED)
622 return false; 623 return false;
623 624
624 SSLInfo ssl_info; 625 SSLInfo ssl_info;
625 bool was_npn_negotiated; 626 bool was_npn_negotiated;
626 NextProto protocol_negotiated = kProtoUnknown; 627 NextProto protocol_negotiated = kProtoUnknown;
627 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 628 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
628 return true; // This is not a secure session, so all domains are okay. 629 return true; // This is not a secure session, so all domains are okay.
629 630
630 bool unused = false; 631 bool unused = false;
631 return 632 return
632 !ssl_info.client_cert_sent && 633 !ssl_info.client_cert_sent &&
633 (!ssl_info.channel_id_sent || 634 (!ssl_info.channel_id_sent ||
634 (ServerBoundCertService::GetDomainForHost(domain) == 635 (ServerBoundCertService::GetDomainForHost(domain) ==
635 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) && 636 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
636 ssl_info.cert->VerifyNameMatch(domain, &unused); 637 ssl_info.cert->VerifyNameMatch(domain, &unused);
637 } 638 }
638 639
639 int SpdySession::GetPushStream( 640 int SpdySession::GetPushStream(
640 const GURL& url, 641 const GURL& url,
641 base::WeakPtr<SpdyStream>* stream, 642 base::WeakPtr<SpdyStream>* stream,
642 const BoundNetLog& stream_net_log) { 643 const BoundNetLog& stream_net_log) {
643 CHECK(!in_io_loop_); 644 CHECK(!in_io_loop_);
644 645
645 stream->reset(); 646 stream->reset();
646 647
647 if (availability_state_ == STATE_DRAINING) 648 // TODO(akalin): Add unit test exercising this code path.
649 if (availability_state_ == STATE_CLOSED)
648 return ERR_CONNECTION_CLOSED; 650 return ERR_CONNECTION_CLOSED;
649 651
650 Error err = TryAccessStream(url); 652 Error err = TryAccessStream(url);
651 if (err != OK) 653 if (err != OK)
652 return err; 654 return err;
653 655
654 *stream = GetActivePushStream(url); 656 *stream = GetActivePushStream(url);
655 if (*stream) { 657 if (*stream) {
656 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 658 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
657 streams_pushed_and_claimed_count_++; 659 streams_pushed_and_claimed_count_++;
658 } 660 }
659 return OK; 661 return OK;
660 } 662 }
661 663
662 // {,Try}CreateStream() and TryAccessStream() can be called with 664 // {,Try}CreateStream() and TryAccessStream() can be called with
663 // |in_io_loop_| set if a stream is being created in response to 665 // |in_io_loop_| set if a stream is being created in response to
664 // another being closed due to received data. 666 // another being closed due to received data.
665 667
666 Error SpdySession::TryAccessStream(const GURL& url) { 668 Error SpdySession::TryAccessStream(const GURL& url) {
667 CHECK_NE(availability_state_, STATE_DRAINING); 669 DCHECK_NE(availability_state_, STATE_CLOSED);
668 670
669 if (is_secure_ && certificate_error_code_ != OK && 671 if (is_secure_ && certificate_error_code_ != OK &&
670 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 672 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
671 RecordProtocolErrorHistogram( 673 RecordProtocolErrorHistogram(
672 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 674 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
673 DoDrainSession( 675 CloseSessionResult result = DoCloseSession(
674 static_cast<Error>(certificate_error_code_), 676 static_cast<Error>(certificate_error_code_),
675 "Tried to get SPDY stream for secure content over an unauthenticated " 677 "Tried to get SPDY stream for secure content over an unauthenticated "
676 "session."); 678 "session.");
679 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
677 return ERR_SPDY_PROTOCOL_ERROR; 680 return ERR_SPDY_PROTOCOL_ERROR;
678 } 681 }
679 return OK; 682 return OK;
680 } 683 }
681 684
682 int SpdySession::TryCreateStream( 685 int SpdySession::TryCreateStream(
683 const base::WeakPtr<SpdyStreamRequest>& request, 686 const base::WeakPtr<SpdyStreamRequest>& request,
684 base::WeakPtr<SpdyStream>* stream) { 687 base::WeakPtr<SpdyStream>* stream) {
685 DCHECK(request); 688 DCHECK(request);
686 689
687 if (availability_state_ == STATE_GOING_AWAY) 690 if (availability_state_ == STATE_GOING_AWAY)
688 return ERR_FAILED; 691 return ERR_FAILED;
689 692
690 if (availability_state_ == STATE_DRAINING) 693 // TODO(akalin): Add unit test exercising this code path.
694 if (availability_state_ == STATE_CLOSED)
691 return ERR_CONNECTION_CLOSED; 695 return ERR_CONNECTION_CLOSED;
692 696
693 Error err = TryAccessStream(request->url()); 697 Error err = TryAccessStream(request->url());
694 if (err != OK) 698 if (err != OK)
695 return err; 699 return err;
696 700
697 if (!max_concurrent_streams_ || 701 if (!max_concurrent_streams_ ||
698 (active_streams_.size() + created_streams_.size() < 702 (active_streams_.size() + created_streams_.size() <
699 max_concurrent_streams_)) { 703 max_concurrent_streams_)) {
700 return CreateStream(*request, stream); 704 return CreateStream(*request, stream);
701 } 705 }
702 706
703 stalled_streams_++; 707 stalled_streams_++;
704 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 708 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
705 RequestPriority priority = request->priority(); 709 RequestPriority priority = request->priority();
706 CHECK_GE(priority, MINIMUM_PRIORITY); 710 CHECK_GE(priority, MINIMUM_PRIORITY);
707 CHECK_LE(priority, MAXIMUM_PRIORITY); 711 CHECK_LE(priority, MAXIMUM_PRIORITY);
708 pending_create_stream_queues_[priority].push_back(request); 712 pending_create_stream_queues_[priority].push_back(request);
709 return ERR_IO_PENDING; 713 return ERR_IO_PENDING;
710 } 714 }
711 715
712 int SpdySession::CreateStream(const SpdyStreamRequest& request, 716 int SpdySession::CreateStream(const SpdyStreamRequest& request,
713 base::WeakPtr<SpdyStream>* stream) { 717 base::WeakPtr<SpdyStream>* stream) {
714 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 718 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
715 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY); 719 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
716 720
717 if (availability_state_ == STATE_GOING_AWAY) 721 if (availability_state_ == STATE_GOING_AWAY)
718 return ERR_FAILED; 722 return ERR_FAILED;
719 723
720 if (availability_state_ == STATE_DRAINING) 724 // TODO(akalin): Add unit test exercising this code path.
725 if (availability_state_ == STATE_CLOSED)
721 return ERR_CONNECTION_CLOSED; 726 return ERR_CONNECTION_CLOSED;
722 727
723 Error err = TryAccessStream(request.url()); 728 Error err = TryAccessStream(request.url());
724 if (err != OK) { 729 if (err != OK) {
725 // This should have been caught in TryCreateStream(). 730 // This should have been caught in TryCreateStream().
726 NOTREACHED(); 731 NOTREACHED();
727 return err; 732 return err;
728 } 733 }
729 734
730 DCHECK(connection_->socket()); 735 DCHECK(connection_->socket());
731 DCHECK(connection_->socket()->IsConnected()); 736 DCHECK(connection_->socket()->IsConnected());
732 if (connection_->socket()) { 737 if (connection_->socket()) {
733 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 738 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
734 connection_->socket()->IsConnected()); 739 connection_->socket()->IsConnected());
735 if (!connection_->socket()->IsConnected()) { 740 if (!connection_->socket()->IsConnected()) {
736 DoDrainSession( 741 CloseSessionResult result = DoCloseSession(
737 ERR_CONNECTION_CLOSED, 742 ERR_CONNECTION_CLOSED,
738 "Tried to create SPDY stream for a closed socket connection."); 743 "Tried to create SPDY stream for a closed socket connection.");
744 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
739 return ERR_CONNECTION_CLOSED; 745 return ERR_CONNECTION_CLOSED;
740 } 746 }
741 } 747 }
742 748
743 scoped_ptr<SpdyStream> new_stream( 749 scoped_ptr<SpdyStream> new_stream(
744 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 750 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
745 request.priority(), 751 request.priority(),
746 stream_initial_send_window_size_, 752 stream_initial_send_window_size_,
747 stream_initial_recv_window_size_, 753 stream_initial_recv_window_size_,
748 request.net_log())); 754 request.net_log()));
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
869 875
870 return true; 876 return true;
871 } 877 }
872 878
873 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 879 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
874 return weak_factory_.GetWeakPtr(); 880 return weak_factory_.GetWeakPtr();
875 } 881 }
876 882
877 bool SpdySession::CloseOneIdleConnection() { 883 bool SpdySession::CloseOneIdleConnection() {
878 CHECK(!in_io_loop_); 884 CHECK(!in_io_loop_);
885 DCHECK_NE(availability_state_, STATE_CLOSED);
879 DCHECK(pool_); 886 DCHECK(pool_);
880 if (active_streams_.empty()) { 887 if (!active_streams_.empty())
881 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 888 return false;
889 CloseSessionResult result =
890 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
891 if (result != SESSION_CLOSED_AND_REMOVED) {
892 NOTREACHED();
893 return false;
882 } 894 }
883 // Return false as the socket wasn't immediately closed. 895 return true;
884 return false;
885 } 896 }
886 897
887 void SpdySession::EnqueueStreamWrite( 898 void SpdySession::EnqueueStreamWrite(
888 const base::WeakPtr<SpdyStream>& stream, 899 const base::WeakPtr<SpdyStream>& stream,
889 SpdyFrameType frame_type, 900 SpdyFrameType frame_type,
890 scoped_ptr<SpdyBufferProducer> producer) { 901 scoped_ptr<SpdyBufferProducer> producer) {
891 DCHECK(frame_type == HEADERS || 902 DCHECK(frame_type == HEADERS ||
892 frame_type == DATA || 903 frame_type == DATA ||
893 frame_type == CREDENTIAL || 904 frame_type == CREDENTIAL ||
894 frame_type == SYN_STREAM); 905 frame_type == SYN_STREAM);
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
927 stream_id)); 938 stream_id));
928 } 939 }
929 940
930 return syn_frame.Pass(); 941 return syn_frame.Pass();
931 } 942 }
932 943
933 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 944 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
934 IOBuffer* data, 945 IOBuffer* data,
935 int len, 946 int len,
936 SpdyDataFlags flags) { 947 SpdyDataFlags flags) {
937 if (availability_state_ == STATE_DRAINING) { 948 if (availability_state_ == STATE_CLOSED) {
949 NOTREACHED();
938 return scoped_ptr<SpdyBuffer>(); 950 return scoped_ptr<SpdyBuffer>();
939 } 951 }
940 952
941 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 953 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
942 CHECK(it != active_streams_.end()); 954 CHECK(it != active_streams_.end());
943 SpdyStream* stream = it->second.stream; 955 SpdyStream* stream = it->second.stream;
944 CHECK_EQ(stream->stream_id(), stream_id); 956 CHECK_EQ(stream->stream_id(), stream_id);
945 957
946 if (len < 0) { 958 if (len < 0) {
947 NOTREACHED(); 959 NOTREACHED();
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
1116 // TODO(akalin): When SpdyStream was ref-counted (and 1128 // TODO(akalin): When SpdyStream was ref-counted (and
1117 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1129 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1118 // was only done when status was not OK. This meant that pushed 1130 // was only done when status was not OK. This meant that pushed
1119 // streams can still be claimed after they're closed. This is 1131 // streams can still be claimed after they're closed. This is
1120 // probably something that we still want to support, although server 1132 // probably something that we still want to support, although server
1121 // push is hardly used. Write tests for this and fix this. (See 1133 // push is hardly used. Write tests for this and fix this. (See
1122 // http://crbug.com/261712 .) 1134 // http://crbug.com/261712 .)
1123 if (owned_stream->type() == SPDY_PUSH_STREAM) 1135 if (owned_stream->type() == SPDY_PUSH_STREAM)
1124 unclaimed_pushed_streams_.erase(owned_stream->url()); 1136 unclaimed_pushed_streams_.erase(owned_stream->url());
1125 1137
1138 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1139
1126 DeleteStream(owned_stream.Pass(), status); 1140 DeleteStream(owned_stream.Pass(), status);
1127 MaybeFinishGoingAway(); 1141
1142 if (!weak_this)
1143 return;
1144
1145 if (availability_state_ == STATE_CLOSED)
1146 return;
1128 1147
1129 // If there are no active streams and the socket pool is stalled, close the 1148 // If there are no active streams and the socket pool is stalled, close the
1130 // session to free up a socket slot. 1149 // session to free up a socket slot.
1131 if (active_streams_.empty() && connection_->IsPoolStalled()) { 1150 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1132 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 1151 CloseSessionResult result =
1152 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1153 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1133 } 1154 }
1134 } 1155 }
1135 1156
1136 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1157 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1137 int status) { 1158 int status) {
1138 scoped_ptr<SpdyStream> owned_stream(*it); 1159 scoped_ptr<SpdyStream> owned_stream(*it);
1139 created_streams_.erase(it); 1160 created_streams_.erase(it);
1140 DeleteStream(owned_stream.Pass(), status); 1161 DeleteStream(owned_stream.Pass(), status);
1141 } 1162 }
1142 1163
(...skipping 24 matching lines...) Expand all
1167 DCHECK(buffered_spdy_framer_.get()); 1188 DCHECK(buffered_spdy_framer_.get());
1168 scoped_ptr<SpdyFrame> rst_frame( 1189 scoped_ptr<SpdyFrame> rst_frame(
1169 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1190 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1170 1191
1171 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1192 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1172 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status)); 1193 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1173 } 1194 }
1174 1195
1175 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1196 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1176 CHECK(!in_io_loop_); 1197 CHECK(!in_io_loop_);
1177 if (availability_state_ == STATE_DRAINING) { 1198 CHECK_NE(availability_state_, STATE_CLOSED);
1199 CHECK_EQ(read_state_, expected_read_state);
1200
1201 result = DoReadLoop(expected_read_state, result);
1202
1203 if (availability_state_ == STATE_CLOSED) {
1204 CHECK_EQ(result, error_on_close_);
1205 CHECK_LT(error_on_close_, ERR_IO_PENDING);
1206 RemoveFromPool();
1178 return; 1207 return;
1179 } 1208 }
1180 ignore_result(DoReadLoop(expected_read_state, result)); 1209
1210 CHECK(result == OK || result == ERR_IO_PENDING);
1181 } 1211 }
1182 1212
1183 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1213 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1184 CHECK(!in_io_loop_); 1214 CHECK(!in_io_loop_);
1215 CHECK_NE(availability_state_, STATE_CLOSED);
1185 CHECK_EQ(read_state_, expected_read_state); 1216 CHECK_EQ(read_state_, expected_read_state);
1186 1217
1187 in_io_loop_ = true; 1218 in_io_loop_ = true;
1188 1219
1189 int bytes_read_without_yielding = 0; 1220 int bytes_read_without_yielding = 0;
1190 1221
1191 // Loop until the session is draining, the read becomes blocked, or 1222 // Loop until the session is closed, the read becomes blocked, or
1192 // the read limit is exceeded. 1223 // the read limit is exceeded.
1193 while (true) { 1224 while (true) {
1194 switch (read_state_) { 1225 switch (read_state_) {
1195 case READ_STATE_DO_READ: 1226 case READ_STATE_DO_READ:
1196 CHECK_EQ(result, OK); 1227 CHECK_EQ(result, OK);
1197 result = DoRead(); 1228 result = DoRead();
1198 break; 1229 break;
1199 case READ_STATE_DO_READ_COMPLETE: 1230 case READ_STATE_DO_READ_COMPLETE:
1200 if (result > 0) 1231 if (result > 0)
1201 bytes_read_without_yielding += result; 1232 bytes_read_without_yielding += result;
1202 result = DoReadComplete(result); 1233 result = DoReadComplete(result);
1203 break; 1234 break;
1204 default: 1235 default:
1205 NOTREACHED() << "read_state_: " << read_state_; 1236 NOTREACHED() << "read_state_: " << read_state_;
1206 break; 1237 break;
1207 } 1238 }
1208 1239
1209 if (availability_state_ == STATE_DRAINING) 1240 if (availability_state_ == STATE_CLOSED) {
1241 CHECK_EQ(result, error_on_close_);
1242 CHECK_LT(result, ERR_IO_PENDING);
1210 break; 1243 break;
1244 }
1211 1245
1212 if (result == ERR_IO_PENDING) 1246 if (result == ERR_IO_PENDING)
1213 break; 1247 break;
1214 1248
1215 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1249 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1216 read_state_ = READ_STATE_DO_READ; 1250 read_state_ = READ_STATE_DO_READ;
1217 base::MessageLoop::current()->PostTask( 1251 base::MessageLoop::current()->PostTask(
1218 FROM_HERE, 1252 FROM_HERE,
1219 base::Bind(&SpdySession::PumpReadLoop, 1253 base::Bind(&SpdySession::PumpReadLoop,
1220 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1254 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1221 result = ERR_IO_PENDING; 1255 result = ERR_IO_PENDING;
1222 break; 1256 break;
1223 } 1257 }
1224 } 1258 }
1225 1259
1226 CHECK(in_io_loop_); 1260 CHECK(in_io_loop_);
1227 in_io_loop_ = false; 1261 in_io_loop_ = false;
1228 1262
1229 return result; 1263 return result;
1230 } 1264 }
1231 1265
1232 int SpdySession::DoRead() { 1266 int SpdySession::DoRead() {
1233 CHECK(in_io_loop_); 1267 CHECK(in_io_loop_);
1268 CHECK_NE(availability_state_, STATE_CLOSED);
1234 1269
1235 CHECK(connection_); 1270 CHECK(connection_);
1236 CHECK(connection_->socket()); 1271 CHECK(connection_->socket());
1237 read_state_ = READ_STATE_DO_READ_COMPLETE; 1272 read_state_ = READ_STATE_DO_READ_COMPLETE;
1238 return connection_->socket()->Read( 1273 return connection_->socket()->Read(
1239 read_buffer_.get(), 1274 read_buffer_.get(),
1240 kReadBufferSize, 1275 kReadBufferSize,
1241 base::Bind(&SpdySession::PumpReadLoop, 1276 base::Bind(&SpdySession::PumpReadLoop,
1242 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1277 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1243 } 1278 }
1244 1279
1245 int SpdySession::DoReadComplete(int result) { 1280 int SpdySession::DoReadComplete(int result) {
1246 CHECK(in_io_loop_); 1281 CHECK(in_io_loop_);
1282 DCHECK_NE(availability_state_, STATE_CLOSED);
1247 1283
1248 // Parse a frame. For now this code requires that the frame fit into our 1284 // Parse a frame. For now this code requires that the frame fit into our
1249 // buffer (kReadBufferSize). 1285 // buffer (kReadBufferSize).
1250 // TODO(mbelshe): support arbitrarily large frames! 1286 // TODO(mbelshe): support arbitrarily large frames!
1251 1287
1252 if (result == 0) { 1288 if (result == 0) {
1253 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1289 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1254 total_bytes_received_, 1, 100000000, 50); 1290 total_bytes_received_, 1, 100000000, 50);
1255 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1291 CloseSessionResult close_session_result =
1256 1292 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
1293 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1294 DCHECK_EQ(availability_state_, STATE_CLOSED);
1295 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
1257 return ERR_CONNECTION_CLOSED; 1296 return ERR_CONNECTION_CLOSED;
1258 } 1297 }
1259 1298
1260 if (result < 0) { 1299 if (result < 0) {
1261 DoDrainSession(static_cast<Error>(result), "result is < 0."); 1300 CloseSessionResult close_session_result =
1301 DoCloseSession(static_cast<Error>(result), "result is < 0.");
1302 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1303 DCHECK_EQ(availability_state_, STATE_CLOSED);
1304 DCHECK_EQ(error_on_close_, result);
1262 return result; 1305 return result;
1263 } 1306 }
1264 CHECK_LE(result, kReadBufferSize); 1307 CHECK_LE(result, kReadBufferSize);
1265 total_bytes_received_ += result; 1308 total_bytes_received_ += result;
1266 1309
1267 last_activity_time_ = time_func_(); 1310 last_activity_time_ = time_func_();
1268 1311
1269 DCHECK(buffered_spdy_framer_.get()); 1312 DCHECK(buffered_spdy_framer_.get());
1270 char* data = read_buffer_->data(); 1313 char* data = read_buffer_->data();
1271 while (result > 0) { 1314 while (result > 0) {
1272 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1315 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1273 result -= bytes_processed; 1316 result -= bytes_processed;
1274 data += bytes_processed; 1317 data += bytes_processed;
1275 1318
1276 if (availability_state_ == STATE_DRAINING) { 1319 if (availability_state_ == STATE_CLOSED) {
1277 return ERR_CONNECTION_CLOSED; 1320 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1321 return error_on_close_;
1278 } 1322 }
1279 1323
1280 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1324 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1281 } 1325 }
1282 1326
1283 read_state_ = READ_STATE_DO_READ; 1327 read_state_ = READ_STATE_DO_READ;
1284 return OK; 1328 return OK;
1285 } 1329 }
1286 1330
1287 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1331 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1288 CHECK(!in_io_loop_); 1332 CHECK(!in_io_loop_);
1333 DCHECK_NE(availability_state_, STATE_CLOSED);
1289 DCHECK_EQ(write_state_, expected_write_state); 1334 DCHECK_EQ(write_state_, expected_write_state);
1290 1335
1291 DoWriteLoop(expected_write_state, result); 1336 result = DoWriteLoop(expected_write_state, result);
1292 1337
1293 if (availability_state_ == STATE_DRAINING && !in_flight_write_ && 1338 if (availability_state_ == STATE_CLOSED) {
1294 write_queue_.IsEmpty()) { 1339 DCHECK_EQ(result, error_on_close_);
1295 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|. 1340 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1341 RemoveFromPool();
1296 return; 1342 return;
1297 } 1343 }
1344
1345 DCHECK(result == OK || result == ERR_IO_PENDING);
1298 } 1346 }
1299 1347
1300 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1348 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1301 CHECK(!in_io_loop_); 1349 CHECK(!in_io_loop_);
1350 DCHECK_NE(availability_state_, STATE_CLOSED);
1302 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1351 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1303 DCHECK_EQ(write_state_, expected_write_state); 1352 DCHECK_EQ(write_state_, expected_write_state);
1304 1353
1305 in_io_loop_ = true; 1354 in_io_loop_ = true;
1306 1355
1307 // Loop until the session is closed or the write becomes blocked. 1356 // Loop until the session is closed or the write becomes blocked.
1308 while (true) { 1357 while (true) {
1309 switch (write_state_) { 1358 switch (write_state_) {
1310 case WRITE_STATE_DO_WRITE: 1359 case WRITE_STATE_DO_WRITE:
1311 DCHECK_EQ(result, OK); 1360 DCHECK_EQ(result, OK);
1312 result = DoWrite(); 1361 result = DoWrite();
1313 break; 1362 break;
1314 case WRITE_STATE_DO_WRITE_COMPLETE: 1363 case WRITE_STATE_DO_WRITE_COMPLETE:
1315 result = DoWriteComplete(result); 1364 result = DoWriteComplete(result);
1316 break; 1365 break;
1317 case WRITE_STATE_IDLE: 1366 case WRITE_STATE_IDLE:
1318 default: 1367 default:
1319 NOTREACHED() << "write_state_: " << write_state_; 1368 NOTREACHED() << "write_state_: " << write_state_;
1320 break; 1369 break;
1321 } 1370 }
1322 1371
1372 if (availability_state_ == STATE_CLOSED) {
1373 DCHECK_EQ(result, error_on_close_);
1374 DCHECK_LT(result, ERR_IO_PENDING);
1375 break;
1376 }
1377
1323 if (write_state_ == WRITE_STATE_IDLE) { 1378 if (write_state_ == WRITE_STATE_IDLE) {
1324 DCHECK_EQ(result, ERR_IO_PENDING); 1379 DCHECK_EQ(result, ERR_IO_PENDING);
1325 break; 1380 break;
1326 } 1381 }
1327 1382
1328 if (result == ERR_IO_PENDING) 1383 if (result == ERR_IO_PENDING)
1329 break; 1384 break;
1330 } 1385 }
1331 1386
1332 CHECK(in_io_loop_); 1387 CHECK(in_io_loop_);
1333 in_io_loop_ = false; 1388 in_io_loop_ = false;
1334 1389
1335 return result; 1390 return result;
1336 } 1391 }
1337 1392
1338 int SpdySession::DoWrite() { 1393 int SpdySession::DoWrite() {
1339 CHECK(in_io_loop_); 1394 CHECK(in_io_loop_);
1395 DCHECK_NE(availability_state_, STATE_CLOSED);
1340 1396
1341 DCHECK(buffered_spdy_framer_); 1397 DCHECK(buffered_spdy_framer_);
1342 if (in_flight_write_) { 1398 if (in_flight_write_) {
1343 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1399 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1344 } else { 1400 } else {
1345 // Grab the next frame to send. 1401 // Grab the next frame to send.
1346 SpdyFrameType frame_type = DATA; 1402 SpdyFrameType frame_type = DATA;
1347 scoped_ptr<SpdyBufferProducer> producer; 1403 scoped_ptr<SpdyBufferProducer> producer;
1348 base::WeakPtr<SpdyStream> stream; 1404 base::WeakPtr<SpdyStream> stream;
1349 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1405 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1393 in_flight_write_->GetIOBufferForRemainingData(); 1449 in_flight_write_->GetIOBufferForRemainingData();
1394 return connection_->socket()->Write( 1450 return connection_->socket()->Write(
1395 write_io_buffer.get(), 1451 write_io_buffer.get(),
1396 in_flight_write_->GetRemainingSize(), 1452 in_flight_write_->GetRemainingSize(),
1397 base::Bind(&SpdySession::PumpWriteLoop, 1453 base::Bind(&SpdySession::PumpWriteLoop,
1398 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1454 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1399 } 1455 }
1400 1456
1401 int SpdySession::DoWriteComplete(int result) { 1457 int SpdySession::DoWriteComplete(int result) {
1402 CHECK(in_io_loop_); 1458 CHECK(in_io_loop_);
1459 DCHECK_NE(availability_state_, STATE_CLOSED);
1403 DCHECK_NE(result, ERR_IO_PENDING); 1460 DCHECK_NE(result, ERR_IO_PENDING);
1404 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1461 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1405 1462
1406 last_activity_time_ = time_func_(); 1463 last_activity_time_ = time_func_();
1407 1464
1408 if (result < 0) { 1465 if (result < 0) {
1409 DCHECK_NE(result, ERR_IO_PENDING); 1466 DCHECK_NE(result, ERR_IO_PENDING);
1410 in_flight_write_.reset(); 1467 in_flight_write_.reset();
1411 in_flight_write_frame_type_ = DATA; 1468 in_flight_write_frame_type_ = DATA;
1412 in_flight_write_frame_size_ = 0; 1469 in_flight_write_frame_size_ = 0;
1413 in_flight_write_stream_.reset(); 1470 in_flight_write_stream_.reset();
1414 write_state_ = WRITE_STATE_DO_WRITE; 1471 CloseSessionResult close_session_result =
1415 DoDrainSession(static_cast<Error>(result), "Write error"); 1472 DoCloseSession(static_cast<Error>(result), "Write error");
1416 return OK; 1473 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1474 DCHECK_EQ(availability_state_, STATE_CLOSED);
1475 DCHECK_EQ(error_on_close_, result);
1476 return result;
1417 } 1477 }
1418 1478
1419 // It should not be possible to have written more bytes than our 1479 // It should not be possible to have written more bytes than our
1420 // in_flight_write_. 1480 // in_flight_write_.
1421 DCHECK_LE(static_cast<size_t>(result), 1481 DCHECK_LE(static_cast<size_t>(result),
1422 in_flight_write_->GetRemainingSize()); 1482 in_flight_write_->GetRemainingSize());
1423 1483
1424 if (result > 0) { 1484 if (result > 0) {
1425 in_flight_write_->Consume(static_cast<size_t>(result)); 1485 in_flight_write_->Consume(static_cast<size_t>(result));
1426 1486
(...skipping 23 matching lines...) Expand all
1450 void SpdySession::DcheckGoingAway() const { 1510 void SpdySession::DcheckGoingAway() const {
1451 #if DCHECK_IS_ON 1511 #if DCHECK_IS_ON
1452 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1512 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1453 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 1513 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1454 DCHECK(pending_create_stream_queues_[i].empty()); 1514 DCHECK(pending_create_stream_queues_[i].empty());
1455 } 1515 }
1456 DCHECK(created_streams_.empty()); 1516 DCHECK(created_streams_.empty());
1457 #endif 1517 #endif
1458 } 1518 }
1459 1519
1460 void SpdySession::DcheckDraining() const { 1520 void SpdySession::DcheckClosed() const {
1461 DcheckGoingAway(); 1521 DcheckGoingAway();
1462 DCHECK_EQ(availability_state_, STATE_DRAINING); 1522 DCHECK_EQ(availability_state_, STATE_CLOSED);
1523 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1463 DCHECK(active_streams_.empty()); 1524 DCHECK(active_streams_.empty());
1464 DCHECK(unclaimed_pushed_streams_.empty()); 1525 DCHECK(unclaimed_pushed_streams_.empty());
1526 DCHECK(write_queue_.IsEmpty());
1465 } 1527 }
1466 1528
1467 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1529 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1468 Error status) { 1530 Error status) {
1469 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1531 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1470 1532
1471 // The loops below are carefully written to avoid reentrancy problems. 1533 // The loops below are carefully written to avoid reentrancy problems.
1472 1534
1473 while (true) { 1535 while (true) {
1474 size_t old_size = GetTotalSize(pending_create_stream_queues_); 1536 size_t old_size = GetTotalSize(pending_create_stream_queues_);
(...skipping 29 matching lines...) Expand all
1504 // away. 1566 // away.
1505 DCHECK_GT(old_size, created_streams_.size()); 1567 DCHECK_GT(old_size, created_streams_.size());
1506 } 1568 }
1507 1569
1508 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1570 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1509 1571
1510 DcheckGoingAway(); 1572 DcheckGoingAway();
1511 } 1573 }
1512 1574
1513 void SpdySession::MaybeFinishGoingAway() { 1575 void SpdySession::MaybeFinishGoingAway() {
1514 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) { 1576 DcheckGoingAway();
1515 DoDrainSession(OK, "Finished going away"); 1577 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
1578 CloseSessionResult result =
1579 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
1580 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1516 } 1581 }
1517 } 1582 }
1518 1583
1519 void SpdySession::DoDrainSession(Error err, const std::string& description) { 1584 SpdySession::CloseSessionResult SpdySession::DoCloseSession(
1520 if (availability_state_ == STATE_DRAINING) { 1585 Error err,
1521 return; 1586 const std::string& description) {
1522 } 1587 CHECK_LT(err, ERR_IO_PENDING);
1523 MakeUnavailable();
1524 1588
1525 // TODO(jgraettinger): If draining with an |err|, enqueue a GOAWAY frame here. 1589 if (availability_state_ == STATE_CLOSED)
1526 1590 return SESSION_ALREADY_CLOSED;
1527 availability_state_ = STATE_DRAINING;
1528 error_on_close_ = err;
1529 1591
1530 net_log_.AddEvent( 1592 net_log_.AddEvent(
1531 NetLog::TYPE_SPDY_SESSION_CLOSE, 1593 NetLog::TYPE_SPDY_SESSION_CLOSE,
1532 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1594 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1533 1595
1534 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1596 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1535 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1597 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1536 total_bytes_received_, 1, 100000000, 50); 1598 total_bytes_received_, 1, 100000000, 50);
1537 1599
1600 CHECK(pool_);
1601 if (availability_state_ != STATE_GOING_AWAY)
1602 pool_->MakeSessionUnavailable(GetWeakPtr());
1603
1604 availability_state_ = STATE_CLOSED;
1605 error_on_close_ = err;
1606
1538 StartGoingAway(0, err); 1607 StartGoingAway(0, err);
1539 DcheckDraining(); 1608 write_queue_.Clear();
1540 MaybePostWriteLoop(); 1609
1610 DcheckClosed();
1611
1612 if (in_io_loop_)
1613 return SESSION_CLOSED_BUT_NOT_REMOVED;
1614
1615 RemoveFromPool();
1616 return SESSION_CLOSED_AND_REMOVED;
1617 }
1618
1619 void SpdySession::RemoveFromPool() {
1620 DcheckClosed();
1621 CHECK(pool_);
1622
1623 SpdySessionPool* pool = pool_;
1624 pool_ = NULL;
1625 pool->RemoveUnavailableSession(GetWeakPtr());
1541 } 1626 }
1542 1627
1543 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1628 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1544 DCHECK(stream); 1629 DCHECK(stream);
1545 std::string description = base::StringPrintf( 1630 std::string description = base::StringPrintf(
1546 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1631 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1547 stream->url().spec(); 1632 stream->url().spec();
1548 stream->LogStreamError(status, description); 1633 stream->LogStreamError(status, description);
1549 // We don't increment the streams abandoned counter here. If the 1634 // We don't increment the streams abandoned counter here. If the
1550 // stream isn't active (i.e., it hasn't written anything to the wire 1635 // stream isn't active (i.e., it hasn't written anything to the wire
(...skipping 18 matching lines...) Expand all
1569 1654
1570 SpdyStreamId SpdySession::GetNewStreamId() { 1655 SpdyStreamId SpdySession::GetNewStreamId() {
1571 CHECK_LE(stream_hi_water_mark_, kLastStreamId); 1656 CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1572 SpdyStreamId id = stream_hi_water_mark_; 1657 SpdyStreamId id = stream_hi_water_mark_;
1573 stream_hi_water_mark_ += 2; 1658 stream_hi_water_mark_ += 2;
1574 return id; 1659 return id;
1575 } 1660 }
1576 1661
1577 void SpdySession::CloseSessionOnError(Error err, 1662 void SpdySession::CloseSessionOnError(Error err,
1578 const std::string& description) { 1663 const std::string& description) {
1579 DoDrainSession(err, description); 1664 // We may be called from anywhere, so we can't expect a particular
1665 // return value.
1666 ignore_result(DoCloseSession(err, description));
1580 } 1667 }
1581 1668
1582 void SpdySession::MakeUnavailable() { 1669 void SpdySession::MakeUnavailable() {
1583 CHECK_NE(availability_state_, STATE_DRAINING); 1670 if (availability_state_ < STATE_GOING_AWAY) {
1584 if (availability_state_ == STATE_AVAILABLE) {
1585 availability_state_ = STATE_GOING_AWAY; 1671 availability_state_ = STATE_GOING_AWAY;
1672 DCHECK(pool_);
1586 pool_->MakeSessionUnavailable(GetWeakPtr()); 1673 pool_->MakeSessionUnavailable(GetWeakPtr());
1587 } 1674 }
1588 } 1675 }
1589 1676
1590 base::Value* SpdySession::GetInfoAsValue() const { 1677 base::Value* SpdySession::GetInfoAsValue() const {
1591 base::DictionaryValue* dict = new base::DictionaryValue(); 1678 base::DictionaryValue* dict = new base::DictionaryValue();
1592 1679
1593 dict->SetInteger("source_id", net_log_.source().id); 1680 dict->SetInteger("source_id", net_log_.source().id);
1594 1681
1595 dict->SetString("host_port_pair", host_port_pair().ToString()); 1682 dict->SetString("host_port_pair", host_port_pair().ToString());
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
1684 scoped_ptr<SpdyBufferProducer>( 1771 scoped_ptr<SpdyBufferProducer>(
1685 new SimpleBufferProducer( 1772 new SimpleBufferProducer(
1686 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1773 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1687 base::WeakPtr<SpdyStream>()); 1774 base::WeakPtr<SpdyStream>());
1688 } 1775 }
1689 1776
1690 void SpdySession::EnqueueWrite(RequestPriority priority, 1777 void SpdySession::EnqueueWrite(RequestPriority priority,
1691 SpdyFrameType frame_type, 1778 SpdyFrameType frame_type,
1692 scoped_ptr<SpdyBufferProducer> producer, 1779 scoped_ptr<SpdyBufferProducer> producer,
1693 const base::WeakPtr<SpdyStream>& stream) { 1780 const base::WeakPtr<SpdyStream>& stream) {
1694 if (availability_state_ == STATE_DRAINING) 1781 if (availability_state_ == STATE_CLOSED)
1695 return; 1782 return;
1696 1783
1784 bool was_idle = write_queue_.IsEmpty();
1697 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1785 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1698 MaybePostWriteLoop();
1699 }
1700
1701 void SpdySession::MaybePostWriteLoop() {
1702 if (write_state_ == WRITE_STATE_IDLE) { 1786 if (write_state_ == WRITE_STATE_IDLE) {
1703 CHECK(!in_flight_write_); 1787 DCHECK(was_idle);
1788 DCHECK(!in_flight_write_);
1704 write_state_ = WRITE_STATE_DO_WRITE; 1789 write_state_ = WRITE_STATE_DO_WRITE;
1705 base::MessageLoop::current()->PostTask( 1790 base::MessageLoop::current()->PostTask(
1706 FROM_HERE, 1791 FROM_HERE,
1707 base::Bind(&SpdySession::PumpWriteLoop, 1792 base::Bind(&SpdySession::PumpWriteLoop,
1708 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1793 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1709 } 1794 }
1710 } 1795 }
1711 1796
1712 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1797 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1713 CHECK_EQ(stream->stream_id(), 0u); 1798 CHECK_EQ(stream->stream_id(), 0u);
(...skipping 23 matching lines...) Expand all
1737 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1822 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1738 if (in_flight_write_stream_.get() == stream.get()) { 1823 if (in_flight_write_stream_.get() == stream.get()) {
1739 // If we're deleting the stream for the in-flight write, we still 1824 // If we're deleting the stream for the in-flight write, we still
1740 // need to let the write complete, so we clear 1825 // need to let the write complete, so we clear
1741 // |in_flight_write_stream_| and let the write finish on its own 1826 // |in_flight_write_stream_| and let the write finish on its own
1742 // without notifying |in_flight_write_stream_|. 1827 // without notifying |in_flight_write_stream_|.
1743 in_flight_write_stream_.reset(); 1828 in_flight_write_stream_.reset();
1744 } 1829 }
1745 1830
1746 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1831 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1832
1833 // |stream->OnClose()| may end up closing |this|, so detect that.
1834 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1835
1747 stream->OnClose(status); 1836 stream->OnClose(status);
1748 1837
1749 if (availability_state_ == STATE_AVAILABLE) { 1838 if (!weak_this)
1750 ProcessPendingStreamRequests(); 1839 return;
1840
1841 switch (availability_state_) {
1842 case STATE_AVAILABLE:
1843 ProcessPendingStreamRequests();
1844 break;
1845 case STATE_GOING_AWAY:
1846 DcheckGoingAway();
1847 MaybeFinishGoingAway();
1848 break;
1849 case STATE_CLOSED:
1850 // Do nothing.
1851 break;
1751 } 1852 }
1752 } 1853 }
1753 1854
1754 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1855 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1755 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1856 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1756 1857
1757 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1858 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1758 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1859 if (unclaimed_it == unclaimed_pushed_streams_.end())
1759 return base::WeakPtr<SpdyStream>(); 1860 return base::WeakPtr<SpdyStream>();
1760 1861
(...skipping 23 matching lines...) Expand all
1784 SSLCertRequestInfo* cert_request_info) { 1885 SSLCertRequestInfo* cert_request_info) {
1785 if (!is_secure_) 1886 if (!is_secure_)
1786 return false; 1887 return false;
1787 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1888 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1788 return true; 1889 return true;
1789 } 1890 }
1790 1891
1791 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1892 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1792 CHECK(in_io_loop_); 1893 CHECK(in_io_loop_);
1793 1894
1895 if (availability_state_ == STATE_CLOSED)
1896 return;
1897
1794 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code)); 1898 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1795 std::string description = base::StringPrintf( 1899 std::string description = base::StringPrintf(
1796 "SPDY_ERROR error_code: %d.", error_code); 1900 "SPDY_ERROR error_code: %d.", error_code);
1797 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, description); 1901 CloseSessionResult result =
1902 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
1903 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
1798 } 1904 }
1799 1905
1800 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1906 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1801 const std::string& description) { 1907 const std::string& description) {
1802 CHECK(in_io_loop_); 1908 CHECK(in_io_loop_);
1803 1909
1910 if (availability_state_ == STATE_CLOSED)
1911 return;
1912
1804 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1913 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1805 if (it == active_streams_.end()) { 1914 if (it == active_streams_.end()) {
1806 // We still want to send a frame to reset the stream even if we 1915 // We still want to send a frame to reset the stream even if we
1807 // don't know anything about it. 1916 // don't know anything about it.
1808 EnqueueResetStreamFrame( 1917 EnqueueResetStreamFrame(
1809 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1918 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1810 return; 1919 return;
1811 } 1920 }
1812 1921
1813 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1922 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1814 } 1923 }
1815 1924
1816 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, 1925 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1817 size_t length, 1926 size_t length,
1818 bool fin) { 1927 bool fin) {
1819 CHECK(in_io_loop_); 1928 CHECK(in_io_loop_);
1820 1929
1930 if (availability_state_ == STATE_CLOSED)
1931 return;
1932
1821 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1933 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1822 1934
1823 // By the time data comes in, the stream may already be inactive. 1935 // By the time data comes in, the stream may already be inactive.
1824 if (it == active_streams_.end()) 1936 if (it == active_streams_.end())
1825 return; 1937 return;
1826 1938
1827 SpdyStream* stream = it->second.stream; 1939 SpdyStream* stream = it->second.stream;
1828 CHECK_EQ(stream->stream_id(), stream_id); 1940 CHECK_EQ(stream->stream_id(), stream_id);
1829 1941
1830 DCHECK(buffered_spdy_framer_); 1942 DCHECK(buffered_spdy_framer_);
1831 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize(); 1943 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1832 stream->IncrementRawReceivedBytes(header_len); 1944 stream->IncrementRawReceivedBytes(header_len);
1833 } 1945 }
1834 1946
1835 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1947 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1836 const char* data, 1948 const char* data,
1837 size_t len, 1949 size_t len,
1838 bool fin) { 1950 bool fin) {
1839 CHECK(in_io_loop_); 1951 CHECK(in_io_loop_);
1840 1952
1953 if (availability_state_ == STATE_CLOSED)
1954 return;
1955
1841 if (data == NULL && len != 0) { 1956 if (data == NULL && len != 0) {
1842 // This is notification of consumed data padding. 1957 // This is notification of consumed data padding.
1843 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames. 1958 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
1844 // See crbug.com/353012. 1959 // See crbug.com/353012.
1845 return; 1960 return;
1846 } 1961 }
1847 1962
1848 DCHECK_LT(len, 1u << 24); 1963 DCHECK_LT(len, 1u << 24);
1849 if (net_log().IsLogging()) { 1964 if (net_log().IsLogging()) {
1850 net_log().AddEvent( 1965 net_log().AddEvent(
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
1890 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2005 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
1891 return; 2006 return;
1892 } 2007 }
1893 2008
1894 stream->OnDataReceived(buffer.Pass()); 2009 stream->OnDataReceived(buffer.Pass());
1895 } 2010 }
1896 2011
1897 void SpdySession::OnSettings(bool clear_persisted) { 2012 void SpdySession::OnSettings(bool clear_persisted) {
1898 CHECK(in_io_loop_); 2013 CHECK(in_io_loop_);
1899 2014
2015 if (availability_state_ == STATE_CLOSED)
2016 return;
2017
1900 if (clear_persisted) 2018 if (clear_persisted)
1901 http_server_properties_->ClearSpdySettings(host_port_pair()); 2019 http_server_properties_->ClearSpdySettings(host_port_pair());
1902 2020
1903 if (net_log_.IsLogging()) { 2021 if (net_log_.IsLogging()) {
1904 net_log_.AddEvent( 2022 net_log_.AddEvent(
1905 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 2023 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1906 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 2024 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
1907 clear_persisted)); 2025 clear_persisted));
1908 } 2026 }
1909 2027
1910 if (GetProtocolVersion() >= SPDY4) { 2028 if (GetProtocolVersion() >= SPDY4) {
1911 // Send an acknowledgment of the setting. 2029 // Send an acknowledgment of the setting.
1912 SpdySettingsIR settings_ir; 2030 SpdySettingsIR settings_ir;
1913 settings_ir.set_is_ack(true); 2031 settings_ir.set_is_ack(true);
1914 EnqueueSessionWrite( 2032 EnqueueSessionWrite(
1915 HIGHEST, 2033 HIGHEST,
1916 SETTINGS, 2034 SETTINGS,
1917 scoped_ptr<SpdyFrame>( 2035 scoped_ptr<SpdyFrame>(
1918 buffered_spdy_framer_->SerializeFrame(settings_ir))); 2036 buffered_spdy_framer_->SerializeFrame(settings_ir)));
1919 } 2037 }
1920 } 2038 }
1921 2039
1922 void SpdySession::OnSetting(SpdySettingsIds id, 2040 void SpdySession::OnSetting(SpdySettingsIds id,
1923 uint8 flags, 2041 uint8 flags,
1924 uint32 value) { 2042 uint32 value) {
1925 CHECK(in_io_loop_); 2043 CHECK(in_io_loop_);
1926 2044
2045 if (availability_state_ == STATE_CLOSED)
2046 return;
2047
1927 HandleSetting(id, value); 2048 HandleSetting(id, value);
1928 http_server_properties_->SetSpdySetting( 2049 http_server_properties_->SetSpdySetting(
1929 host_port_pair(), 2050 host_port_pair(),
1930 id, 2051 id,
1931 static_cast<SpdySettingsFlags>(flags), 2052 static_cast<SpdySettingsFlags>(flags),
1932 value); 2053 value);
1933 received_settings_ = true; 2054 received_settings_ = true;
1934 2055
1935 // Log the setting. 2056 // Log the setting.
1936 net_log_.AddEvent( 2057 net_log_.AddEvent(
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
1984 } 2105 }
1985 2106
1986 void SpdySession::OnSynStream(SpdyStreamId stream_id, 2107 void SpdySession::OnSynStream(SpdyStreamId stream_id,
1987 SpdyStreamId associated_stream_id, 2108 SpdyStreamId associated_stream_id,
1988 SpdyPriority priority, 2109 SpdyPriority priority,
1989 bool fin, 2110 bool fin,
1990 bool unidirectional, 2111 bool unidirectional,
1991 const SpdyHeaderBlock& headers) { 2112 const SpdyHeaderBlock& headers) {
1992 CHECK(in_io_loop_); 2113 CHECK(in_io_loop_);
1993 2114
2115 if (availability_state_ == STATE_CLOSED)
2116 return;
2117
1994 base::Time response_time = base::Time::Now(); 2118 base::Time response_time = base::Time::Now();
1995 base::TimeTicks recv_first_byte_time = time_func_(); 2119 base::TimeTicks recv_first_byte_time = time_func_();
1996 2120
1997 if (net_log_.IsLogging()) { 2121 if (net_log_.IsLogging()) {
1998 net_log_.AddEvent( 2122 net_log_.AddEvent(
1999 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 2123 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2000 base::Bind(&NetLogSpdySynStreamReceivedCallback, 2124 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2001 &headers, fin, unidirectional, priority, 2125 &headers, fin, unidirectional, priority,
2002 stream_id, associated_stream_id)); 2126 stream_id, associated_stream_id));
2003 } 2127 }
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after
2169 2293
2170 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2294 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2171 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2295 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2172 } 2296 }
2173 2297
2174 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2298 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2175 bool fin, 2299 bool fin,
2176 const SpdyHeaderBlock& headers) { 2300 const SpdyHeaderBlock& headers) {
2177 CHECK(in_io_loop_); 2301 CHECK(in_io_loop_);
2178 2302
2303 if (availability_state_ == STATE_CLOSED)
2304 return;
2305
2179 base::Time response_time = base::Time::Now(); 2306 base::Time response_time = base::Time::Now();
2180 base::TimeTicks recv_first_byte_time = time_func_(); 2307 base::TimeTicks recv_first_byte_time = time_func_();
2181 2308
2182 if (net_log().IsLogging()) { 2309 if (net_log().IsLogging()) {
2183 net_log().AddEvent( 2310 net_log().AddEvent(
2184 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2311 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2185 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2312 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2186 &headers, fin, stream_id)); 2313 &headers, fin, stream_id));
2187 } 2314 }
2188 2315
(...skipping 27 matching lines...) Expand all
2216 2343
2217 ignore_result(OnInitialResponseHeadersReceived( 2344 ignore_result(OnInitialResponseHeadersReceived(
2218 headers, response_time, recv_first_byte_time, stream)); 2345 headers, response_time, recv_first_byte_time, stream));
2219 } 2346 }
2220 2347
2221 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2348 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2222 bool fin, 2349 bool fin,
2223 const SpdyHeaderBlock& headers) { 2350 const SpdyHeaderBlock& headers) {
2224 CHECK(in_io_loop_); 2351 CHECK(in_io_loop_);
2225 2352
2353 if (availability_state_ == STATE_CLOSED)
2354 return;
2355
2226 if (net_log().IsLogging()) { 2356 if (net_log().IsLogging()) {
2227 net_log().AddEvent( 2357 net_log().AddEvent(
2228 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2358 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2229 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2359 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2230 &headers, fin, stream_id)); 2360 &headers, fin, stream_id));
2231 } 2361 }
2232 2362
2233 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2363 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2234 if (it == active_streams_.end()) { 2364 if (it == active_streams_.end()) {
2235 // NOTE: it may just be that the stream was cancelled. 2365 // NOTE: it may just be that the stream was cancelled.
(...skipping 27 matching lines...) Expand all
2263 DCHECK_NE(rv, ERR_IO_PENDING); 2393 DCHECK_NE(rv, ERR_IO_PENDING);
2264 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2394 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2265 } 2395 }
2266 } 2396 }
2267 } 2397 }
2268 2398
2269 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2399 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2270 SpdyRstStreamStatus status) { 2400 SpdyRstStreamStatus status) {
2271 CHECK(in_io_loop_); 2401 CHECK(in_io_loop_);
2272 2402
2403 if (availability_state_ == STATE_CLOSED)
2404 return;
2405
2273 std::string description; 2406 std::string description;
2274 net_log().AddEvent( 2407 net_log().AddEvent(
2275 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2408 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2276 base::Bind(&NetLogSpdyRstCallback, 2409 base::Bind(&NetLogSpdyRstCallback,
2277 stream_id, status, &description)); 2410 stream_id, status, &description));
2278 2411
2279 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2412 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2280 if (it == active_streams_.end()) { 2413 if (it == active_streams_.end()) {
2281 // NOTE: it may just be that the stream was cancelled. 2414 // NOTE: it may just be that the stream was cancelled.
2282 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2415 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
(...skipping 15 matching lines...) Expand all
2298 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2431 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2299 // For now, it doesn't matter much - it is a protocol error. 2432 // For now, it doesn't matter much - it is a protocol error.
2300 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2433 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2301 } 2434 }
2302 } 2435 }
2303 2436
2304 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2437 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2305 SpdyGoAwayStatus status) { 2438 SpdyGoAwayStatus status) {
2306 CHECK(in_io_loop_); 2439 CHECK(in_io_loop_);
2307 2440
2441 if (availability_state_ == STATE_CLOSED)
2442 return;
2443
2308 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2444 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2309 base::Bind(&NetLogSpdyGoAwayCallback, 2445 base::Bind(&NetLogSpdyGoAwayCallback,
2310 last_accepted_stream_id, 2446 last_accepted_stream_id,
2311 active_streams_.size(), 2447 active_streams_.size(),
2312 unclaimed_pushed_streams_.size(), 2448 unclaimed_pushed_streams_.size(),
2313 status)); 2449 status));
2314 MakeUnavailable(); 2450 MakeUnavailable();
2315 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2451 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2316 // This is to handle the case when we already don't have any active 2452 // This is to handle the case when we already don't have any active
2317 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2453 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2318 // active streams and so the last one being closed will finish the 2454 // active streams and so the last one being closed will finish the
2319 // going away process (see DeleteStream()). 2455 // going away process (see DeleteStream()).
2320 MaybeFinishGoingAway(); 2456 MaybeFinishGoingAway();
2321 } 2457 }
2322 2458
2323 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { 2459 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2324 CHECK(in_io_loop_); 2460 CHECK(in_io_loop_);
2325 2461
2462 if (availability_state_ == STATE_CLOSED)
2463 return;
2464
2326 net_log_.AddEvent( 2465 net_log_.AddEvent(
2327 NetLog::TYPE_SPDY_SESSION_PING, 2466 NetLog::TYPE_SPDY_SESSION_PING,
2328 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received")); 2467 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2329 2468
2330 // Send response to a PING from server. 2469 // Send response to a PING from server.
2331 if ((protocol_ >= kProtoSPDY4 && !is_ack) || 2470 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2332 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) { 2471 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2333 WritePingFrame(unique_id, true); 2472 WritePingFrame(unique_id, true);
2334 return; 2473 return;
2335 } 2474 }
2336 2475
2337 --pings_in_flight_; 2476 --pings_in_flight_;
2338 if (pings_in_flight_ < 0) { 2477 if (pings_in_flight_ < 0) {
2339 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2478 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2340 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2479 CloseSessionResult result =
2480 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2481 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2341 pings_in_flight_ = 0; 2482 pings_in_flight_ = 0;
2342 return; 2483 return;
2343 } 2484 }
2344 2485
2345 if (pings_in_flight_ > 0) 2486 if (pings_in_flight_ > 0)
2346 return; 2487 return;
2347 2488
2348 // We will record RTT in histogram when there are no more client sent 2489 // We will record RTT in histogram when there are no more client sent
2349 // pings_in_flight_. 2490 // pings_in_flight_.
2350 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2491 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2351 } 2492 }
2352 2493
2353 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2494 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2354 uint32 delta_window_size) { 2495 uint32 delta_window_size) {
2355 CHECK(in_io_loop_); 2496 CHECK(in_io_loop_);
2356 2497
2498 if (availability_state_ == STATE_CLOSED)
2499 return;
2500
2357 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2501 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2358 net_log_.AddEvent( 2502 net_log_.AddEvent(
2359 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2503 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2360 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2504 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2361 stream_id, delta_window_size)); 2505 stream_id, delta_window_size));
2362 2506
2363 if (stream_id == kSessionFlowControlStreamId) { 2507 if (stream_id == kSessionFlowControlStreamId) {
2364 // WINDOW_UPDATE for the session. 2508 // WINDOW_UPDATE for the session.
2365 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2509 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2366 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2510 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2367 << "session flow control is not turned on"; 2511 << "session flow control is not turned on";
2368 // TODO(akalin): Record an error and close the session. 2512 // TODO(akalin): Record an error and close the session.
2369 return; 2513 return;
2370 } 2514 }
2371 2515
2372 if (delta_window_size < 1u) { 2516 if (delta_window_size < 1u) {
2373 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2517 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2374 DoDrainSession( 2518 CloseSessionResult result = DoCloseSession(
2375 ERR_SPDY_PROTOCOL_ERROR, 2519 ERR_SPDY_PROTOCOL_ERROR,
2376 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2520 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2377 base::UintToString(delta_window_size)); 2521 base::UintToString(delta_window_size));
2522 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2378 return; 2523 return;
2379 } 2524 }
2380 2525
2381 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2526 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2382 } else { 2527 } else {
2383 // WINDOW_UPDATE for a stream. 2528 // WINDOW_UPDATE for a stream.
2384 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2529 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2385 // TODO(akalin): Record an error and close the session. 2530 // TODO(akalin): Record an error and close the session.
2386 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2531 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2387 << " when flow control is not turned on"; 2532 << " when flow control is not turned on";
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
2425 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2570 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2426 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2571 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2427 CHECK(it != active_streams_.end()); 2572 CHECK(it != active_streams_.end());
2428 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2573 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2429 SendWindowUpdateFrame( 2574 SendWindowUpdateFrame(
2430 stream_id, delta_window_size, it->second.stream->priority()); 2575 stream_id, delta_window_size, it->second.stream->priority());
2431 } 2576 }
2432 2577
2433 void SpdySession::SendInitialData() { 2578 void SpdySession::SendInitialData() {
2434 DCHECK(enable_sending_initial_data_); 2579 DCHECK(enable_sending_initial_data_);
2580 DCHECK_NE(availability_state_, STATE_CLOSED);
2435 2581
2436 if (send_connection_header_prefix_) { 2582 if (send_connection_header_prefix_) {
2437 DCHECK_EQ(protocol_, kProtoSPDY4); 2583 DCHECK_EQ(protocol_, kProtoSPDY4);
2438 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2584 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2439 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2585 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2440 kHttp2ConnectionHeaderPrefixSize, 2586 kHttp2ConnectionHeaderPrefixSize,
2441 false /* take_ownership */)); 2587 false /* take_ownership */));
2442 // Count the prefix as part of the subsequent SETTINGS frame. 2588 // Count the prefix as part of the subsequent SETTINGS frame.
2443 EnqueueSessionWrite(HIGHEST, SETTINGS, 2589 EnqueueSessionWrite(HIGHEST, SETTINGS,
2444 connection_header_prefix_frame.Pass()); 2590 connection_header_prefix_frame.Pass());
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
2490 const SpdySettingsIds new_id = it->first; 2636 const SpdySettingsIds new_id = it->first;
2491 const uint32 new_val = it->second.second; 2637 const uint32 new_val = it->second.second;
2492 HandleSetting(new_id, new_val); 2638 HandleSetting(new_id, new_val);
2493 } 2639 }
2494 2640
2495 SendSettings(server_settings_map); 2641 SendSettings(server_settings_map);
2496 } 2642 }
2497 2643
2498 2644
2499 void SpdySession::SendSettings(const SettingsMap& settings) { 2645 void SpdySession::SendSettings(const SettingsMap& settings) {
2646 DCHECK_NE(availability_state_, STATE_CLOSED);
2647
2500 net_log_.AddEvent( 2648 net_log_.AddEvent(
2501 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2649 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2502 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2650 base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2503 2651
2504 // Create the SETTINGS frame and send it. 2652 // Create the SETTINGS frame and send it.
2505 DCHECK(buffered_spdy_framer_.get()); 2653 DCHECK(buffered_spdy_framer_.get());
2506 scoped_ptr<SpdyFrame> settings_frame( 2654 scoped_ptr<SpdyFrame> settings_frame(
2507 buffered_spdy_framer_->CreateSettings(settings)); 2655 buffered_spdy_framer_->CreateSettings(settings));
2508 sent_settings_ = true; 2656 sent_settings_ = true;
2509 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2657 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
2618 2766
2619 check_ping_status_pending_ = true; 2767 check_ping_status_pending_ = true;
2620 base::MessageLoop::current()->PostDelayedTask( 2768 base::MessageLoop::current()->PostDelayedTask(
2621 FROM_HERE, 2769 FROM_HERE,
2622 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2770 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2623 time_func_()), hung_interval_); 2771 time_func_()), hung_interval_);
2624 } 2772 }
2625 2773
2626 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2774 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2627 CHECK(!in_io_loop_); 2775 CHECK(!in_io_loop_);
2776 DCHECK_NE(availability_state_, STATE_CLOSED);
2628 2777
2629 // Check if we got a response back for all PINGs we had sent. 2778 // Check if we got a response back for all PINGs we had sent.
2630 if (pings_in_flight_ == 0) { 2779 if (pings_in_flight_ == 0) {
2631 check_ping_status_pending_ = false; 2780 check_ping_status_pending_ = false;
2632 return; 2781 return;
2633 } 2782 }
2634 2783
2635 DCHECK(check_ping_status_pending_); 2784 DCHECK(check_ping_status_pending_);
2636 2785
2637 base::TimeTicks now = time_func_(); 2786 base::TimeTicks now = time_func_();
2638 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2787 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2639 2788
2640 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2789 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2641 // Track all failed PING messages in a separate bucket. 2790 // Track all failed PING messages in a separate bucket.
2642 RecordPingRTTHistogram(base::TimeDelta::Max()); 2791 RecordPingRTTHistogram(base::TimeDelta::Max());
2643 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2792 CloseSessionResult result =
2793 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2794 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
2644 return; 2795 return;
2645 } 2796 }
2646 2797
2647 // Check the status of connection after a delay. 2798 // Check the status of connection after a delay.
2648 base::MessageLoop::current()->PostDelayedTask( 2799 base::MessageLoop::current()->PostDelayedTask(
2649 FROM_HERE, 2800 FROM_HERE,
2650 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2801 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2651 now), 2802 now),
2652 delay); 2803 delay);
2653 } 2804 }
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
2766 return ssl_socket; 2917 return ssl_socket;
2767 } 2918 }
2768 2919
2769 void SpdySession::OnWriteBufferConsumed( 2920 void SpdySession::OnWriteBufferConsumed(
2770 size_t frame_payload_size, 2921 size_t frame_payload_size,
2771 size_t consume_size, 2922 size_t consume_size,
2772 SpdyBuffer::ConsumeSource consume_source) { 2923 SpdyBuffer::ConsumeSource consume_source) {
2773 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2924 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2774 // deleted (e.g., a stream is closed due to incoming data). 2925 // deleted (e.g., a stream is closed due to incoming data).
2775 2926
2927 if (availability_state_ == STATE_CLOSED)
2928 return;
2929
2776 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2930 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2777 2931
2778 if (consume_source == SpdyBuffer::DISCARD) { 2932 if (consume_source == SpdyBuffer::DISCARD) {
2779 // If we're discarding a frame or part of it, increase the send 2933 // If we're discarding a frame or part of it, increase the send
2780 // window by the number of discarded bytes. (Although if we're 2934 // window by the number of discarded bytes. (Although if we're
2781 // discarding part of a frame, it's probably because of a write 2935 // discarding part of a frame, it's probably because of a write
2782 // error and we'll be tearing down the session soon.) 2936 // error and we'll be tearing down the session soon.)
2783 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2937 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2784 DCHECK_GT(remaining_payload_bytes, 0u); 2938 DCHECK_GT(remaining_payload_bytes, 0u);
2785 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2939 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2786 } 2940 }
2787 // For consumed bytes, the send window is increased when we receive 2941 // For consumed bytes, the send window is increased when we receive
2788 // a WINDOW_UPDATE frame. 2942 // a WINDOW_UPDATE frame.
2789 } 2943 }
2790 2944
2791 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2945 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2792 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2946 // We can be called with |in_io_loop_| set if a SpdyBuffer is
2793 // deleted (e.g., a stream is closed due to incoming data). 2947 // deleted (e.g., a stream is closed due to incoming data).
2794 2948
2949 DCHECK_NE(availability_state_, STATE_CLOSED);
2795 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2950 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2796 DCHECK_GE(delta_window_size, 1); 2951 DCHECK_GE(delta_window_size, 1);
2797 2952
2798 // Check for overflow. 2953 // Check for overflow.
2799 int32 max_delta_window_size = kint32max - session_send_window_size_; 2954 int32 max_delta_window_size = kint32max - session_send_window_size_;
2800 if (delta_window_size > max_delta_window_size) { 2955 if (delta_window_size > max_delta_window_size) {
2801 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2956 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2802 DoDrainSession( 2957 CloseSessionResult result = DoCloseSession(
2803 ERR_SPDY_PROTOCOL_ERROR, 2958 ERR_SPDY_PROTOCOL_ERROR,
2804 "Received WINDOW_UPDATE [delta: " + 2959 "Received WINDOW_UPDATE [delta: " +
2805 base::IntToString(delta_window_size) + 2960 base::IntToString(delta_window_size) +
2806 "] for session overflows session_send_window_size_ [current: " + 2961 "] for session overflows session_send_window_size_ [current: " +
2807 base::IntToString(session_send_window_size_) + "]"); 2962 base::IntToString(session_send_window_size_) + "]");
2963 DCHECK_NE(result, SESSION_ALREADY_CLOSED);
2808 return; 2964 return;
2809 } 2965 }
2810 2966
2811 session_send_window_size_ += delta_window_size; 2967 session_send_window_size_ += delta_window_size;
2812 2968
2813 net_log_.AddEvent( 2969 net_log_.AddEvent(
2814 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2970 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2815 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2971 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2816 delta_window_size, session_send_window_size_)); 2972 delta_window_size, session_send_window_size_));
2817 2973
2818 DCHECK(!IsSendStalled()); 2974 DCHECK(!IsSendStalled());
2819 ResumeSendStalledStreams(); 2975 ResumeSendStalledStreams();
2820 } 2976 }
2821 2977
2822 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 2978 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2979 DCHECK_NE(availability_state_, STATE_CLOSED);
2823 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2980 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2824 2981
2825 // We only call this method when sending a frame. Therefore, 2982 // We only call this method when sending a frame. Therefore,
2826 // |delta_window_size| should be within the valid frame size range. 2983 // |delta_window_size| should be within the valid frame size range.
2827 DCHECK_GE(delta_window_size, 1); 2984 DCHECK_GE(delta_window_size, 1);
2828 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 2985 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
2829 2986
2830 // |send_window_size_| should have been at least |delta_window_size| for 2987 // |send_window_size_| should have been at least |delta_window_size| for
2831 // this call to happen. 2988 // this call to happen.
2832 DCHECK_GE(session_send_window_size_, delta_window_size); 2989 DCHECK_GE(session_send_window_size_, delta_window_size);
2833 2990
2834 session_send_window_size_ -= delta_window_size; 2991 session_send_window_size_ -= delta_window_size;
2835 2992
2836 net_log_.AddEvent( 2993 net_log_.AddEvent(
2837 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2994 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2838 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2995 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2839 -delta_window_size, session_send_window_size_)); 2996 -delta_window_size, session_send_window_size_));
2840 } 2997 }
2841 2998
2842 void SpdySession::OnReadBufferConsumed( 2999 void SpdySession::OnReadBufferConsumed(
2843 size_t consume_size, 3000 size_t consume_size,
2844 SpdyBuffer::ConsumeSource consume_source) { 3001 SpdyBuffer::ConsumeSource consume_source) {
2845 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 3002 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
2846 // deleted (e.g., discarded by a SpdyReadQueue). 3003 // deleted (e.g., discarded by a SpdyReadQueue).
2847 3004
3005 if (availability_state_ == STATE_CLOSED)
3006 return;
3007
2848 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3008 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2849 DCHECK_GE(consume_size, 1u); 3009 DCHECK_GE(consume_size, 1u);
2850 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 3010 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
2851 3011
2852 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 3012 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
2853 } 3013 }
2854 3014
2855 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 3015 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3016 DCHECK_NE(availability_state_, STATE_CLOSED);
2856 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3017 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2857 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 3018 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
2858 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 3019 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
2859 DCHECK_GE(delta_window_size, 1); 3020 DCHECK_GE(delta_window_size, 1);
2860 // Check for overflow. 3021 // Check for overflow.
2861 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 3022 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
2862 3023
2863 session_recv_window_size_ += delta_window_size; 3024 session_recv_window_size_ += delta_window_size;
2864 net_log_.AddEvent( 3025 net_log_.AddEvent(
2865 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 3026 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
(...skipping 12 matching lines...) Expand all
2878 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 3039 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
2879 CHECK(in_io_loop_); 3040 CHECK(in_io_loop_);
2880 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3041 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2881 DCHECK_GE(delta_window_size, 1); 3042 DCHECK_GE(delta_window_size, 1);
2882 3043
2883 // Since we never decrease the initial receive window size, 3044 // Since we never decrease the initial receive window size,
2884 // |delta_window_size| should never cause |recv_window_size_| to go 3045 // |delta_window_size| should never cause |recv_window_size_| to go
2885 // negative. If we do, the receive window isn't being respected. 3046 // negative. If we do, the receive window isn't being respected.
2886 if (delta_window_size > session_recv_window_size_) { 3047 if (delta_window_size > session_recv_window_size_) {
2887 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 3048 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
2888 DoDrainSession( 3049 CloseSessionResult result = DoCloseSession(
2889 ERR_SPDY_PROTOCOL_ERROR, 3050 ERR_SPDY_PROTOCOL_ERROR,
2890 "delta_window_size is " + base::IntToString(delta_window_size) + 3051 "delta_window_size is " + base::IntToString(delta_window_size) +
2891 " in DecreaseRecvWindowSize, which is larger than the receive " + 3052 " in DecreaseRecvWindowSize, which is larger than the receive " +
2892 "window size of " + base::IntToString(session_recv_window_size_)); 3053 "window size of " + base::IntToString(session_recv_window_size_));
3054 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2893 return; 3055 return;
2894 } 3056 }
2895 3057
2896 session_recv_window_size_ -= delta_window_size; 3058 session_recv_window_size_ -= delta_window_size;
2897 net_log_.AddEvent( 3059 net_log_.AddEvent(
2898 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 3060 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2899 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3061 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2900 -delta_window_size, session_recv_window_size_)); 3062 -delta_window_size, session_recv_window_size_));
2901 } 3063 }
2902 3064
2903 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 3065 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
2904 DCHECK(stream.send_stalled_by_flow_control()); 3066 DCHECK(stream.send_stalled_by_flow_control());
2905 RequestPriority priority = stream.priority(); 3067 RequestPriority priority = stream.priority();
2906 CHECK_GE(priority, MINIMUM_PRIORITY); 3068 CHECK_GE(priority, MINIMUM_PRIORITY);
2907 CHECK_LE(priority, MAXIMUM_PRIORITY); 3069 CHECK_LE(priority, MAXIMUM_PRIORITY);
2908 stream_send_unstall_queue_[priority].push_back(stream.stream_id()); 3070 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
2909 } 3071 }
2910 3072
2911 void SpdySession::ResumeSendStalledStreams() { 3073 void SpdySession::ResumeSendStalledStreams() {
2912 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3074 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2913 3075
2914 // We don't have to worry about new streams being queued, since 3076 // We don't have to worry about new streams being queued, since
2915 // doing so would cause IsSendStalled() to return true. But we do 3077 // doing so would cause IsSendStalled() to return true. But we do
2916 // have to worry about streams being closed, as well as ourselves 3078 // have to worry about streams being closed, as well as ourselves
2917 // being closed. 3079 // being closed.
2918 3080
2919 while (!IsSendStalled()) { 3081 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
2920 size_t old_size = 0; 3082 size_t old_size = 0;
2921 #if DCHECK_IS_ON 3083 #if DCHECK_IS_ON
2922 old_size = GetTotalSize(stream_send_unstall_queue_); 3084 old_size = GetTotalSize(stream_send_unstall_queue_);
2923 #endif 3085 #endif
2924 3086
2925 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 3087 SpdyStreamId stream_id = PopStreamToPossiblyResume();
2926 if (stream_id == 0) 3088 if (stream_id == 0)
2927 break; 3089 break;
2928 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 3090 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2929 // The stream may actually still be send-stalled after this (due 3091 // The stream may actually still be send-stalled after this (due
(...skipping 14 matching lines...) Expand all
2944 if (!queue->empty()) { 3106 if (!queue->empty()) {
2945 SpdyStreamId stream_id = queue->front(); 3107 SpdyStreamId stream_id = queue->front();
2946 queue->pop_front(); 3108 queue->pop_front();
2947 return stream_id; 3109 return stream_id;
2948 } 3110 }
2949 } 3111 }
2950 return 0; 3112 return 0;
2951 } 3113 }
2952 3114
2953 } // namespace net 3115 } // namespace net
OLDNEW
« no previous file with comments | « trunk/src/net/spdy/spdy_session.h ('k') | trunk/src/net/spdy/spdy_session_pool.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698