Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(98)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 305823003: Re-land: Defer SpdySession destruction to support closing writes (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Regression tests and fix for crbug.com/379469 Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_pool.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 509 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 net_log_.BeginEvent( 520 net_log_.BeginEvent(
521 NetLog::TYPE_SPDY_SESSION, 521 NetLog::TYPE_SPDY_SESSION,
522 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 522 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
523 next_unclaimed_push_stream_sweep_time_ = time_func_() + 523 next_unclaimed_push_stream_sweep_time_ = time_func_() +
524 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 524 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
525 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 525 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
526 } 526 }
527 527
528 SpdySession::~SpdySession() { 528 SpdySession::~SpdySession() {
529 CHECK(!in_io_loop_); 529 CHECK(!in_io_loop_);
530 DCHECK(!pool_); 530 DcheckDraining();
531 DcheckClosed();
532 531
533 // TODO(akalin): Check connection->is_initialized() instead. This 532 // TODO(akalin): Check connection->is_initialized() instead. This
534 // requires re-working CreateFakeSpdySession(), though. 533 // requires re-working CreateFakeSpdySession(), though.
535 DCHECK(connection_->socket()); 534 DCHECK(connection_->socket());
536 // With SPDY we can't recycle sockets. 535 // With SPDY we can't recycle sockets.
537 connection_->socket()->Disconnect(); 536 connection_->socket()->Disconnect();
538 537
539 RecordHistograms(); 538 RecordHistograms();
540 539
541 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 540 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
595 #if defined(SPDY_PROXY_AUTH_ORIGIN) 594 #if defined(SPDY_PROXY_AUTH_ORIGIN)
596 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 595 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
597 host_port_pair().Equals(HostPortPair::FromURL( 596 host_port_pair().Equals(HostPortPair::FromURL(
598 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 597 GURL(SPDY_PROXY_AUTH_ORIGIN))));
599 #endif 598 #endif
600 599
601 net_log_.AddEvent( 600 net_log_.AddEvent(
602 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 601 NetLog::TYPE_SPDY_SESSION_INITIALIZED,
603 connection_->socket()->NetLog().source().ToEventParametersCallback()); 602 connection_->socket()->NetLog().source().ToEventParametersCallback());
604 603
605 DCHECK_NE(availability_state_, STATE_CLOSED); 604 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
606 connection_->AddHigherLayeredPool(this); 605 connection_->AddHigherLayeredPool(this);
607 if (enable_sending_initial_data_) 606 if (enable_sending_initial_data_)
608 SendInitialData(); 607 SendInitialData();
609 pool_ = pool; 608 pool_ = pool;
610 609
611 // Bootstrap the read loop. 610 // Bootstrap the read loop.
612 base::MessageLoop::current()->PostTask( 611 base::MessageLoop::current()->PostTask(
613 FROM_HERE, 612 FROM_HERE,
614 base::Bind(&SpdySession::PumpReadLoop, 613 base::Bind(&SpdySession::PumpReadLoop,
615 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 614 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
616 } 615 }
617 616
618 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 617 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
619 if (!verify_domain_authentication_) 618 if (!verify_domain_authentication_)
620 return true; 619 return true;
621 620
622 if (availability_state_ == STATE_CLOSED) 621 if (availability_state_ == STATE_DRAINING)
623 return false; 622 return false;
624 623
625 SSLInfo ssl_info; 624 SSLInfo ssl_info;
626 bool was_npn_negotiated; 625 bool was_npn_negotiated;
627 NextProto protocol_negotiated = kProtoUnknown; 626 NextProto protocol_negotiated = kProtoUnknown;
628 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 627 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
629 return true; // This is not a secure session, so all domains are okay. 628 return true; // This is not a secure session, so all domains are okay.
630 629
631 bool unused = false; 630 bool unused = false;
632 return 631 return
633 !ssl_info.client_cert_sent && 632 !ssl_info.client_cert_sent &&
634 (!ssl_info.channel_id_sent || 633 (!ssl_info.channel_id_sent ||
635 (ServerBoundCertService::GetDomainForHost(domain) == 634 (ServerBoundCertService::GetDomainForHost(domain) ==
636 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) && 635 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
637 ssl_info.cert->VerifyNameMatch(domain, &unused); 636 ssl_info.cert->VerifyNameMatch(domain, &unused);
638 } 637 }
639 638
640 int SpdySession::GetPushStream( 639 int SpdySession::GetPushStream(
641 const GURL& url, 640 const GURL& url,
642 base::WeakPtr<SpdyStream>* stream, 641 base::WeakPtr<SpdyStream>* stream,
643 const BoundNetLog& stream_net_log) { 642 const BoundNetLog& stream_net_log) {
644 CHECK(!in_io_loop_); 643 CHECK(!in_io_loop_);
645 644
646 stream->reset(); 645 stream->reset();
647 646
648 // TODO(akalin): Add unit test exercising this code path. 647 if (availability_state_ == STATE_DRAINING)
649 if (availability_state_ == STATE_CLOSED)
650 return ERR_CONNECTION_CLOSED; 648 return ERR_CONNECTION_CLOSED;
651 649
652 Error err = TryAccessStream(url); 650 Error err = TryAccessStream(url);
653 if (err != OK) 651 if (err != OK)
654 return err; 652 return err;
655 653
656 *stream = GetActivePushStream(url); 654 *stream = GetActivePushStream(url);
657 if (*stream) { 655 if (*stream) {
658 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 656 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
659 streams_pushed_and_claimed_count_++; 657 streams_pushed_and_claimed_count_++;
660 } 658 }
661 return OK; 659 return OK;
662 } 660 }
663 661
664 // {,Try}CreateStream() and TryAccessStream() can be called with 662 // {,Try}CreateStream() and TryAccessStream() can be called with
665 // |in_io_loop_| set if a stream is being created in response to 663 // |in_io_loop_| set if a stream is being created in response to
666 // another being closed due to received data. 664 // another being closed due to received data.
667 665
668 Error SpdySession::TryAccessStream(const GURL& url) { 666 Error SpdySession::TryAccessStream(const GURL& url) {
669 DCHECK_NE(availability_state_, STATE_CLOSED);
670
671 if (is_secure_ && certificate_error_code_ != OK && 667 if (is_secure_ && certificate_error_code_ != OK &&
672 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 668 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
673 RecordProtocolErrorHistogram( 669 RecordProtocolErrorHistogram(
674 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 670 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
675 CloseSessionResult result = DoCloseSession( 671 DoDrainSession(
676 static_cast<Error>(certificate_error_code_), 672 static_cast<Error>(certificate_error_code_),
677 "Tried to get SPDY stream for secure content over an unauthenticated " 673 "Tried to get SPDY stream for secure content over an unauthenticated "
678 "session."); 674 "session.");
679 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
680 return ERR_SPDY_PROTOCOL_ERROR; 675 return ERR_SPDY_PROTOCOL_ERROR;
681 } 676 }
682 return OK; 677 return OK;
683 } 678 }
684 679
685 int SpdySession::TryCreateStream( 680 int SpdySession::TryCreateStream(
686 const base::WeakPtr<SpdyStreamRequest>& request, 681 const base::WeakPtr<SpdyStreamRequest>& request,
687 base::WeakPtr<SpdyStream>* stream) { 682 base::WeakPtr<SpdyStream>* stream) {
688 DCHECK(request); 683 DCHECK(request);
689 684
690 if (availability_state_ == STATE_GOING_AWAY) 685 if (availability_state_ == STATE_GOING_AWAY)
691 return ERR_FAILED; 686 return ERR_FAILED;
692 687
693 // TODO(akalin): Add unit test exercising this code path. 688 if (availability_state_ == STATE_DRAINING)
694 if (availability_state_ == STATE_CLOSED)
695 return ERR_CONNECTION_CLOSED; 689 return ERR_CONNECTION_CLOSED;
696 690
697 Error err = TryAccessStream(request->url()); 691 Error err = TryAccessStream(request->url());
698 if (err != OK) 692 if (err != OK)
699 return err; 693 return err;
700 694
701 if (!max_concurrent_streams_ || 695 if (!max_concurrent_streams_ ||
702 (active_streams_.size() + created_streams_.size() < 696 (active_streams_.size() + created_streams_.size() <
703 max_concurrent_streams_)) { 697 max_concurrent_streams_)) {
704 return CreateStream(*request, stream); 698 return CreateStream(*request, stream);
705 } 699 }
706 700
707 stalled_streams_++; 701 stalled_streams_++;
708 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 702 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
709 RequestPriority priority = request->priority(); 703 RequestPriority priority = request->priority();
710 CHECK_GE(priority, MINIMUM_PRIORITY); 704 CHECK_GE(priority, MINIMUM_PRIORITY);
711 CHECK_LE(priority, MAXIMUM_PRIORITY); 705 CHECK_LE(priority, MAXIMUM_PRIORITY);
712 pending_create_stream_queues_[priority].push_back(request); 706 pending_create_stream_queues_[priority].push_back(request);
713 return ERR_IO_PENDING; 707 return ERR_IO_PENDING;
714 } 708 }
715 709
716 int SpdySession::CreateStream(const SpdyStreamRequest& request, 710 int SpdySession::CreateStream(const SpdyStreamRequest& request,
717 base::WeakPtr<SpdyStream>* stream) { 711 base::WeakPtr<SpdyStream>* stream) {
718 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 712 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
719 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY); 713 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
720 714
721 if (availability_state_ == STATE_GOING_AWAY) 715 if (availability_state_ == STATE_GOING_AWAY)
722 return ERR_FAILED; 716 return ERR_FAILED;
723 717
724 // TODO(akalin): Add unit test exercising this code path. 718 if (availability_state_ == STATE_DRAINING)
725 if (availability_state_ == STATE_CLOSED)
726 return ERR_CONNECTION_CLOSED; 719 return ERR_CONNECTION_CLOSED;
727 720
728 Error err = TryAccessStream(request.url()); 721 Error err = TryAccessStream(request.url());
729 if (err != OK) { 722 if (err != OK) {
730 // This should have been caught in TryCreateStream(). 723 // This should have been caught in TryCreateStream().
731 NOTREACHED(); 724 NOTREACHED();
732 return err; 725 return err;
733 } 726 }
734 727
735 DCHECK(connection_->socket()); 728 DCHECK(connection_->socket());
736 DCHECK(connection_->socket()->IsConnected()); 729 DCHECK(connection_->socket()->IsConnected());
737 if (connection_->socket()) { 730 if (connection_->socket()) {
738 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 731 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
739 connection_->socket()->IsConnected()); 732 connection_->socket()->IsConnected());
740 if (!connection_->socket()->IsConnected()) { 733 if (!connection_->socket()->IsConnected()) {
741 CloseSessionResult result = DoCloseSession( 734 DoDrainSession(
742 ERR_CONNECTION_CLOSED, 735 ERR_CONNECTION_CLOSED,
743 "Tried to create SPDY stream for a closed socket connection."); 736 "Tried to create SPDY stream for a closed socket connection.");
744 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
745 return ERR_CONNECTION_CLOSED; 737 return ERR_CONNECTION_CLOSED;
746 } 738 }
747 } 739 }
748 740
749 scoped_ptr<SpdyStream> new_stream( 741 scoped_ptr<SpdyStream> new_stream(
750 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 742 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
751 request.priority(), 743 request.priority(),
752 stream_initial_send_window_size_, 744 stream_initial_send_window_size_,
753 stream_initial_recv_window_size_, 745 stream_initial_recv_window_size_,
754 request.net_log())); 746 request.net_log()));
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
875 867
876 return true; 868 return true;
877 } 869 }
878 870
879 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 871 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
880 return weak_factory_.GetWeakPtr(); 872 return weak_factory_.GetWeakPtr();
881 } 873 }
882 874
883 bool SpdySession::CloseOneIdleConnection() { 875 bool SpdySession::CloseOneIdleConnection() {
884 CHECK(!in_io_loop_); 876 CHECK(!in_io_loop_);
885 DCHECK_NE(availability_state_, STATE_CLOSED);
886 DCHECK(pool_); 877 DCHECK(pool_);
887 if (!active_streams_.empty()) 878 if (active_streams_.empty()) {
888 return false; 879 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
889 CloseSessionResult result =
890 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
891 if (result != SESSION_CLOSED_AND_REMOVED) {
892 NOTREACHED();
893 return false;
894 } 880 }
895 return true; 881 // Return false as the socket wasn't immediately closed.
882 return false;
896 } 883 }
897 884
898 void SpdySession::EnqueueStreamWrite( 885 void SpdySession::EnqueueStreamWrite(
899 const base::WeakPtr<SpdyStream>& stream, 886 const base::WeakPtr<SpdyStream>& stream,
900 SpdyFrameType frame_type, 887 SpdyFrameType frame_type,
901 scoped_ptr<SpdyBufferProducer> producer) { 888 scoped_ptr<SpdyBufferProducer> producer) {
902 DCHECK(frame_type == HEADERS || 889 DCHECK(frame_type == HEADERS ||
903 frame_type == DATA || 890 frame_type == DATA ||
904 frame_type == CREDENTIAL || 891 frame_type == CREDENTIAL ||
905 frame_type == SYN_STREAM); 892 frame_type == SYN_STREAM);
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
938 stream_id)); 925 stream_id));
939 } 926 }
940 927
941 return syn_frame.Pass(); 928 return syn_frame.Pass();
942 } 929 }
943 930
944 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 931 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
945 IOBuffer* data, 932 IOBuffer* data,
946 int len, 933 int len,
947 SpdyDataFlags flags) { 934 SpdyDataFlags flags) {
948 if (availability_state_ == STATE_CLOSED) { 935 if (availability_state_ == STATE_DRAINING) {
949 NOTREACHED();
950 return scoped_ptr<SpdyBuffer>(); 936 return scoped_ptr<SpdyBuffer>();
951 } 937 }
952 938
953 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 939 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
954 CHECK(it != active_streams_.end()); 940 CHECK(it != active_streams_.end());
955 SpdyStream* stream = it->second.stream; 941 SpdyStream* stream = it->second.stream;
956 CHECK_EQ(stream->stream_id(), stream_id); 942 CHECK_EQ(stream->stream_id(), stream_id);
957 943
958 if (len < 0) { 944 if (len < 0) {
959 NOTREACHED(); 945 NOTREACHED();
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
1128 // TODO(akalin): When SpdyStream was ref-counted (and 1114 // TODO(akalin): When SpdyStream was ref-counted (and
1129 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1115 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1130 // was only done when status was not OK. This meant that pushed 1116 // was only done when status was not OK. This meant that pushed
1131 // streams can still be claimed after they're closed. This is 1117 // streams can still be claimed after they're closed. This is
1132 // probably something that we still want to support, although server 1118 // probably something that we still want to support, although server
1133 // push is hardly used. Write tests for this and fix this. (See 1119 // push is hardly used. Write tests for this and fix this. (See
1134 // http://crbug.com/261712 .) 1120 // http://crbug.com/261712 .)
1135 if (owned_stream->type() == SPDY_PUSH_STREAM) 1121 if (owned_stream->type() == SPDY_PUSH_STREAM)
1136 unclaimed_pushed_streams_.erase(owned_stream->url()); 1122 unclaimed_pushed_streams_.erase(owned_stream->url());
1137 1123
1138 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1139
1140 DeleteStream(owned_stream.Pass(), status); 1124 DeleteStream(owned_stream.Pass(), status);
1141 1125 MaybeFinishGoingAway();
1142 if (!weak_this)
1143 return;
1144
1145 if (availability_state_ == STATE_CLOSED)
1146 return;
1147 1126
1148 // If there are no active streams and the socket pool is stalled, close the 1127 // If there are no active streams and the socket pool is stalled, close the
1149 // session to free up a socket slot. 1128 // session to free up a socket slot.
1150 if (active_streams_.empty() && connection_->IsPoolStalled()) { 1129 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1151 CloseSessionResult result = 1130 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1152 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1153 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1154 } 1131 }
1155 } 1132 }
1156 1133
1157 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1134 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1158 int status) { 1135 int status) {
1159 scoped_ptr<SpdyStream> owned_stream(*it); 1136 scoped_ptr<SpdyStream> owned_stream(*it);
1160 created_streams_.erase(it); 1137 created_streams_.erase(it);
1161 DeleteStream(owned_stream.Pass(), status); 1138 DeleteStream(owned_stream.Pass(), status);
1162 } 1139 }
1163 1140
(...skipping 24 matching lines...) Expand all
1188 DCHECK(buffered_spdy_framer_.get()); 1165 DCHECK(buffered_spdy_framer_.get());
1189 scoped_ptr<SpdyFrame> rst_frame( 1166 scoped_ptr<SpdyFrame> rst_frame(
1190 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1167 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1191 1168
1192 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1169 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1193 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status)); 1170 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1194 } 1171 }
1195 1172
1196 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1173 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1197 CHECK(!in_io_loop_); 1174 CHECK(!in_io_loop_);
1198 CHECK_NE(availability_state_, STATE_CLOSED); 1175 if (availability_state_ == STATE_DRAINING) {
1199 CHECK_EQ(read_state_, expected_read_state);
1200
1201 result = DoReadLoop(expected_read_state, result);
1202
1203 if (availability_state_ == STATE_CLOSED) {
1204 CHECK_EQ(result, error_on_close_);
1205 CHECK_LT(error_on_close_, ERR_IO_PENDING);
1206 RemoveFromPool();
1207 return; 1176 return;
1208 } 1177 }
1209 1178 ignore_result(DoReadLoop(expected_read_state, result));
1210 CHECK(result == OK || result == ERR_IO_PENDING);
1211 } 1179 }
1212 1180
1213 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1181 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1214 CHECK(!in_io_loop_); 1182 CHECK(!in_io_loop_);
1215 CHECK_NE(availability_state_, STATE_CLOSED);
1216 CHECK_EQ(read_state_, expected_read_state); 1183 CHECK_EQ(read_state_, expected_read_state);
1217 1184
1218 in_io_loop_ = true; 1185 in_io_loop_ = true;
1219 1186
1220 int bytes_read_without_yielding = 0; 1187 int bytes_read_without_yielding = 0;
1221 1188
1222 // Loop until the session is closed, the read becomes blocked, or 1189 // Loop until the session is draining, the read becomes blocked, or
1223 // the read limit is exceeded. 1190 // the read limit is exceeded.
1224 while (true) { 1191 while (true) {
1225 switch (read_state_) { 1192 switch (read_state_) {
1226 case READ_STATE_DO_READ: 1193 case READ_STATE_DO_READ:
1227 CHECK_EQ(result, OK); 1194 CHECK_EQ(result, OK);
1228 result = DoRead(); 1195 result = DoRead();
1229 break; 1196 break;
1230 case READ_STATE_DO_READ_COMPLETE: 1197 case READ_STATE_DO_READ_COMPLETE:
1231 if (result > 0) 1198 if (result > 0)
1232 bytes_read_without_yielding += result; 1199 bytes_read_without_yielding += result;
1233 result = DoReadComplete(result); 1200 result = DoReadComplete(result);
1234 break; 1201 break;
1235 default: 1202 default:
1236 NOTREACHED() << "read_state_: " << read_state_; 1203 NOTREACHED() << "read_state_: " << read_state_;
1237 break; 1204 break;
1238 } 1205 }
1239 1206
1240 if (availability_state_ == STATE_CLOSED) { 1207 if (availability_state_ == STATE_DRAINING)
1241 CHECK_EQ(result, error_on_close_);
1242 CHECK_LT(result, ERR_IO_PENDING);
1243 break; 1208 break;
1244 }
1245 1209
1246 if (result == ERR_IO_PENDING) 1210 if (result == ERR_IO_PENDING)
1247 break; 1211 break;
1248 1212
1249 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1213 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1250 read_state_ = READ_STATE_DO_READ; 1214 read_state_ = READ_STATE_DO_READ;
1251 base::MessageLoop::current()->PostTask( 1215 base::MessageLoop::current()->PostTask(
1252 FROM_HERE, 1216 FROM_HERE,
1253 base::Bind(&SpdySession::PumpReadLoop, 1217 base::Bind(&SpdySession::PumpReadLoop,
1254 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1218 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1255 result = ERR_IO_PENDING; 1219 result = ERR_IO_PENDING;
1256 break; 1220 break;
1257 } 1221 }
1258 } 1222 }
1259 1223
1260 CHECK(in_io_loop_); 1224 CHECK(in_io_loop_);
1261 in_io_loop_ = false; 1225 in_io_loop_ = false;
1262 1226
1263 return result; 1227 return result;
1264 } 1228 }
1265 1229
1266 int SpdySession::DoRead() { 1230 int SpdySession::DoRead() {
1267 CHECK(in_io_loop_); 1231 CHECK(in_io_loop_);
1268 CHECK_NE(availability_state_, STATE_CLOSED);
1269 1232
1270 CHECK(connection_); 1233 CHECK(connection_);
1271 CHECK(connection_->socket()); 1234 CHECK(connection_->socket());
1272 read_state_ = READ_STATE_DO_READ_COMPLETE; 1235 read_state_ = READ_STATE_DO_READ_COMPLETE;
1273 return connection_->socket()->Read( 1236 return connection_->socket()->Read(
1274 read_buffer_.get(), 1237 read_buffer_.get(),
1275 kReadBufferSize, 1238 kReadBufferSize,
1276 base::Bind(&SpdySession::PumpReadLoop, 1239 base::Bind(&SpdySession::PumpReadLoop,
1277 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1240 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1278 } 1241 }
1279 1242
1280 int SpdySession::DoReadComplete(int result) { 1243 int SpdySession::DoReadComplete(int result) {
1281 CHECK(in_io_loop_); 1244 CHECK(in_io_loop_);
1282 DCHECK_NE(availability_state_, STATE_CLOSED);
1283 1245
1284 // Parse a frame. For now this code requires that the frame fit into our 1246 // Parse a frame. For now this code requires that the frame fit into our
1285 // buffer (kReadBufferSize). 1247 // buffer (kReadBufferSize).
1286 // TODO(mbelshe): support arbitrarily large frames! 1248 // TODO(mbelshe): support arbitrarily large frames!
1287 1249
1288 if (result == 0) { 1250 if (result == 0) {
1289 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1251 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1290 total_bytes_received_, 1, 100000000, 50); 1252 total_bytes_received_, 1, 100000000, 50);
1291 CloseSessionResult close_session_result = 1253 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1292 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1254
1293 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1294 DCHECK_EQ(availability_state_, STATE_CLOSED);
1295 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
1296 return ERR_CONNECTION_CLOSED; 1255 return ERR_CONNECTION_CLOSED;
1297 } 1256 }
1298 1257
1299 if (result < 0) { 1258 if (result < 0) {
1300 CloseSessionResult close_session_result = 1259 DoDrainSession(static_cast<Error>(result), "result is < 0.");
1301 DoCloseSession(static_cast<Error>(result), "result is < 0.");
1302 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1303 DCHECK_EQ(availability_state_, STATE_CLOSED);
1304 DCHECK_EQ(error_on_close_, result);
1305 return result; 1260 return result;
1306 } 1261 }
1307 CHECK_LE(result, kReadBufferSize); 1262 CHECK_LE(result, kReadBufferSize);
1308 total_bytes_received_ += result; 1263 total_bytes_received_ += result;
1309 1264
1310 last_activity_time_ = time_func_(); 1265 last_activity_time_ = time_func_();
1311 1266
1312 DCHECK(buffered_spdy_framer_.get()); 1267 DCHECK(buffered_spdy_framer_.get());
1313 char* data = read_buffer_->data(); 1268 char* data = read_buffer_->data();
1314 while (result > 0) { 1269 while (result > 0) {
1315 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1270 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1316 result -= bytes_processed; 1271 result -= bytes_processed;
1317 data += bytes_processed; 1272 data += bytes_processed;
1318 1273
1319 if (availability_state_ == STATE_CLOSED) { 1274 if (availability_state_ == STATE_DRAINING) {
1320 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1275 return ERR_CONNECTION_CLOSED;
1321 return error_on_close_;
1322 } 1276 }
1323 1277
1324 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1278 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1325 } 1279 }
1326 1280
1327 read_state_ = READ_STATE_DO_READ; 1281 read_state_ = READ_STATE_DO_READ;
1328 return OK; 1282 return OK;
1329 } 1283 }
1330 1284
1331 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1285 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1332 CHECK(!in_io_loop_); 1286 CHECK(!in_io_loop_);
1333 DCHECK_NE(availability_state_, STATE_CLOSED);
1334 DCHECK_EQ(write_state_, expected_write_state); 1287 DCHECK_EQ(write_state_, expected_write_state);
1335 1288
1336 result = DoWriteLoop(expected_write_state, result); 1289 DoWriteLoop(expected_write_state, result);
1337 1290
1338 if (availability_state_ == STATE_CLOSED) { 1291 if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1339 DCHECK_EQ(result, error_on_close_); 1292 write_queue_.IsEmpty()) {
1340 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1293 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|.
1341 RemoveFromPool();
1342 return; 1294 return;
1343 } 1295 }
1344
1345 DCHECK(result == OK || result == ERR_IO_PENDING);
1346 } 1296 }
1347 1297
1348 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1298 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1349 CHECK(!in_io_loop_); 1299 CHECK(!in_io_loop_);
1350 DCHECK_NE(availability_state_, STATE_CLOSED);
1351 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1300 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1352 DCHECK_EQ(write_state_, expected_write_state); 1301 DCHECK_EQ(write_state_, expected_write_state);
1353 1302
1354 in_io_loop_ = true; 1303 in_io_loop_ = true;
1355 1304
1356 // Loop until the session is closed or the write becomes blocked. 1305 // Loop until the session is closed or the write becomes blocked.
1357 while (true) { 1306 while (true) {
1358 switch (write_state_) { 1307 switch (write_state_) {
1359 case WRITE_STATE_DO_WRITE: 1308 case WRITE_STATE_DO_WRITE:
1360 DCHECK_EQ(result, OK); 1309 DCHECK_EQ(result, OK);
1361 result = DoWrite(); 1310 result = DoWrite();
1362 break; 1311 break;
1363 case WRITE_STATE_DO_WRITE_COMPLETE: 1312 case WRITE_STATE_DO_WRITE_COMPLETE:
1364 result = DoWriteComplete(result); 1313 result = DoWriteComplete(result);
1365 break; 1314 break;
1366 case WRITE_STATE_IDLE: 1315 case WRITE_STATE_IDLE:
1367 default: 1316 default:
1368 NOTREACHED() << "write_state_: " << write_state_; 1317 NOTREACHED() << "write_state_: " << write_state_;
1369 break; 1318 break;
1370 } 1319 }
1371 1320
1372 if (availability_state_ == STATE_CLOSED) {
1373 DCHECK_EQ(result, error_on_close_);
1374 DCHECK_LT(result, ERR_IO_PENDING);
1375 break;
1376 }
1377
1378 if (write_state_ == WRITE_STATE_IDLE) { 1321 if (write_state_ == WRITE_STATE_IDLE) {
1379 DCHECK_EQ(result, ERR_IO_PENDING); 1322 DCHECK_EQ(result, ERR_IO_PENDING);
1380 break; 1323 break;
1381 } 1324 }
1382 1325
1383 if (result == ERR_IO_PENDING) 1326 if (result == ERR_IO_PENDING)
1384 break; 1327 break;
1385 } 1328 }
1386 1329
1387 CHECK(in_io_loop_); 1330 CHECK(in_io_loop_);
1388 in_io_loop_ = false; 1331 in_io_loop_ = false;
1389 1332
1390 return result; 1333 return result;
1391 } 1334 }
1392 1335
1393 int SpdySession::DoWrite() { 1336 int SpdySession::DoWrite() {
1394 CHECK(in_io_loop_); 1337 CHECK(in_io_loop_);
1395 DCHECK_NE(availability_state_, STATE_CLOSED);
1396 1338
1397 DCHECK(buffered_spdy_framer_); 1339 DCHECK(buffered_spdy_framer_);
1398 if (in_flight_write_) { 1340 if (in_flight_write_) {
1399 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1341 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1400 } else { 1342 } else {
1401 // Grab the next frame to send. 1343 // Grab the next frame to send.
1402 SpdyFrameType frame_type = DATA; 1344 SpdyFrameType frame_type = DATA;
1403 scoped_ptr<SpdyBufferProducer> producer; 1345 scoped_ptr<SpdyBufferProducer> producer;
1404 base::WeakPtr<SpdyStream> stream; 1346 base::WeakPtr<SpdyStream> stream;
1405 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1347 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1449 in_flight_write_->GetIOBufferForRemainingData(); 1391 in_flight_write_->GetIOBufferForRemainingData();
1450 return connection_->socket()->Write( 1392 return connection_->socket()->Write(
1451 write_io_buffer.get(), 1393 write_io_buffer.get(),
1452 in_flight_write_->GetRemainingSize(), 1394 in_flight_write_->GetRemainingSize(),
1453 base::Bind(&SpdySession::PumpWriteLoop, 1395 base::Bind(&SpdySession::PumpWriteLoop,
1454 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1396 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1455 } 1397 }
1456 1398
1457 int SpdySession::DoWriteComplete(int result) { 1399 int SpdySession::DoWriteComplete(int result) {
1458 CHECK(in_io_loop_); 1400 CHECK(in_io_loop_);
1459 DCHECK_NE(availability_state_, STATE_CLOSED);
1460 DCHECK_NE(result, ERR_IO_PENDING); 1401 DCHECK_NE(result, ERR_IO_PENDING);
1461 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1402 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1462 1403
1463 last_activity_time_ = time_func_(); 1404 last_activity_time_ = time_func_();
1464 1405
1465 if (result < 0) { 1406 if (result < 0) {
1466 DCHECK_NE(result, ERR_IO_PENDING); 1407 DCHECK_NE(result, ERR_IO_PENDING);
1467 in_flight_write_.reset(); 1408 in_flight_write_.reset();
1468 in_flight_write_frame_type_ = DATA; 1409 in_flight_write_frame_type_ = DATA;
1469 in_flight_write_frame_size_ = 0; 1410 in_flight_write_frame_size_ = 0;
1470 in_flight_write_stream_.reset(); 1411 in_flight_write_stream_.reset();
1471 CloseSessionResult close_session_result = 1412 write_state_ = WRITE_STATE_DO_WRITE;
1472 DoCloseSession(static_cast<Error>(result), "Write error"); 1413 DoDrainSession(static_cast<Error>(result), "Write error");
1473 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1414 return OK;
1474 DCHECK_EQ(availability_state_, STATE_CLOSED);
1475 DCHECK_EQ(error_on_close_, result);
1476 return result;
1477 } 1415 }
1478 1416
1479 // It should not be possible to have written more bytes than our 1417 // It should not be possible to have written more bytes than our
1480 // in_flight_write_. 1418 // in_flight_write_.
1481 DCHECK_LE(static_cast<size_t>(result), 1419 DCHECK_LE(static_cast<size_t>(result),
1482 in_flight_write_->GetRemainingSize()); 1420 in_flight_write_->GetRemainingSize());
1483 1421
1484 if (result > 0) { 1422 if (result > 0) {
1485 in_flight_write_->Consume(static_cast<size_t>(result)); 1423 in_flight_write_->Consume(static_cast<size_t>(result));
1486 1424
(...skipping 23 matching lines...) Expand all
1510 void SpdySession::DcheckGoingAway() const { 1448 void SpdySession::DcheckGoingAway() const {
1511 #if DCHECK_IS_ON 1449 #if DCHECK_IS_ON
1512 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1450 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1513 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 1451 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1514 DCHECK(pending_create_stream_queues_[i].empty()); 1452 DCHECK(pending_create_stream_queues_[i].empty());
1515 } 1453 }
1516 DCHECK(created_streams_.empty()); 1454 DCHECK(created_streams_.empty());
1517 #endif 1455 #endif
1518 } 1456 }
1519 1457
1520 void SpdySession::DcheckClosed() const { 1458 void SpdySession::DcheckDraining() const {
1521 DcheckGoingAway(); 1459 DcheckGoingAway();
1522 DCHECK_EQ(availability_state_, STATE_CLOSED); 1460 DCHECK_EQ(availability_state_, STATE_DRAINING);
1523 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1524 DCHECK(active_streams_.empty()); 1461 DCHECK(active_streams_.empty());
1525 DCHECK(unclaimed_pushed_streams_.empty()); 1462 DCHECK(unclaimed_pushed_streams_.empty());
1526 DCHECK(write_queue_.IsEmpty());
1527 } 1463 }
1528 1464
1529 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1465 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1530 Error status) { 1466 Error status) {
1531 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1467 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1532 1468
1533 // The loops below are carefully written to avoid reentrancy problems. 1469 // The loops below are carefully written to avoid reentrancy problems.
1534 1470
1535 while (true) { 1471 while (true) {
1536 size_t old_size = GetTotalSize(pending_create_stream_queues_); 1472 size_t old_size = GetTotalSize(pending_create_stream_queues_);
(...skipping 29 matching lines...) Expand all
1566 // away. 1502 // away.
1567 DCHECK_GT(old_size, created_streams_.size()); 1503 DCHECK_GT(old_size, created_streams_.size());
1568 } 1504 }
1569 1505
1570 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1506 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1571 1507
1572 DcheckGoingAway(); 1508 DcheckGoingAway();
1573 } 1509 }
1574 1510
1575 void SpdySession::MaybeFinishGoingAway() { 1511 void SpdySession::MaybeFinishGoingAway() {
1576 DcheckGoingAway(); 1512 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1577 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) { 1513 DoDrainSession(OK, "Finished going away");
1578 CloseSessionResult result =
1579 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
1580 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1581 } 1514 }
1582 } 1515 }
1583 1516
1584 SpdySession::CloseSessionResult SpdySession::DoCloseSession( 1517 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1585 Error err, 1518 if (availability_state_ == STATE_DRAINING) {
1586 const std::string& description) { 1519 return;
1587 CHECK_LT(err, ERR_IO_PENDING); 1520 }
1521 MakeUnavailable();
1588 1522
1589 if (availability_state_ == STATE_CLOSED) 1523 // TODO(jgraettinger): If draining with an |err|, enqueue a GOAWAY frame here.
1590 return SESSION_ALREADY_CLOSED; 1524
1525 availability_state_ = STATE_DRAINING;
1526 error_on_close_ = err;
1591 1527
1592 net_log_.AddEvent( 1528 net_log_.AddEvent(
1593 NetLog::TYPE_SPDY_SESSION_CLOSE, 1529 NetLog::TYPE_SPDY_SESSION_CLOSE,
1594 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1530 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1595 1531
1596 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1532 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1597 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1533 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1598 total_bytes_received_, 1, 100000000, 50); 1534 total_bytes_received_, 1, 100000000, 50);
1599 1535
1600 CHECK(pool_);
1601 if (availability_state_ != STATE_GOING_AWAY)
1602 pool_->MakeSessionUnavailable(GetWeakPtr());
1603
1604 availability_state_ = STATE_CLOSED;
1605 error_on_close_ = err;
1606
1607 StartGoingAway(0, err); 1536 StartGoingAway(0, err);
1608 write_queue_.Clear(); 1537 DcheckDraining();
1609 1538 MaybePostWriteLoop();
1610 DcheckClosed();
1611
1612 if (in_io_loop_)
1613 return SESSION_CLOSED_BUT_NOT_REMOVED;
1614
1615 RemoveFromPool();
1616 return SESSION_CLOSED_AND_REMOVED;
1617 }
1618
1619 void SpdySession::RemoveFromPool() {
1620 DcheckClosed();
1621 CHECK(pool_);
1622
1623 SpdySessionPool* pool = pool_;
1624 pool_ = NULL;
1625 pool->RemoveUnavailableSession(GetWeakPtr());
1626 } 1539 }
1627 1540
1628 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1541 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1629 DCHECK(stream); 1542 DCHECK(stream);
1630 std::string description = base::StringPrintf( 1543 std::string description = base::StringPrintf(
1631 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1544 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1632 stream->url().spec(); 1545 stream->url().spec();
1633 stream->LogStreamError(status, description); 1546 stream->LogStreamError(status, description);
1634 // We don't increment the streams abandoned counter here. If the 1547 // We don't increment the streams abandoned counter here. If the
1635 // stream isn't active (i.e., it hasn't written anything to the wire 1548 // stream isn't active (i.e., it hasn't written anything to the wire
(...skipping 18 matching lines...) Expand all
1654 1567
1655 SpdyStreamId SpdySession::GetNewStreamId() { 1568 SpdyStreamId SpdySession::GetNewStreamId() {
1656 CHECK_LE(stream_hi_water_mark_, kLastStreamId); 1569 CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1657 SpdyStreamId id = stream_hi_water_mark_; 1570 SpdyStreamId id = stream_hi_water_mark_;
1658 stream_hi_water_mark_ += 2; 1571 stream_hi_water_mark_ += 2;
1659 return id; 1572 return id;
1660 } 1573 }
1661 1574
1662 void SpdySession::CloseSessionOnError(Error err, 1575 void SpdySession::CloseSessionOnError(Error err,
1663 const std::string& description) { 1576 const std::string& description) {
1664 // We may be called from anywhere, so we can't expect a particular 1577 DoDrainSession(err, description);
1665 // return value.
1666 ignore_result(DoCloseSession(err, description));
1667 } 1578 }
1668 1579
1669 void SpdySession::MakeUnavailable() { 1580 void SpdySession::MakeUnavailable() {
1670 if (availability_state_ < STATE_GOING_AWAY) { 1581 if (availability_state_ == STATE_AVAILABLE) {
1671 availability_state_ = STATE_GOING_AWAY; 1582 availability_state_ = STATE_GOING_AWAY;
1672 DCHECK(pool_);
1673 pool_->MakeSessionUnavailable(GetWeakPtr()); 1583 pool_->MakeSessionUnavailable(GetWeakPtr());
1674 } 1584 }
1675 } 1585 }
1676 1586
1677 base::Value* SpdySession::GetInfoAsValue() const { 1587 base::Value* SpdySession::GetInfoAsValue() const {
1678 base::DictionaryValue* dict = new base::DictionaryValue(); 1588 base::DictionaryValue* dict = new base::DictionaryValue();
1679 1589
1680 dict->SetInteger("source_id", net_log_.source().id); 1590 dict->SetInteger("source_id", net_log_.source().id);
1681 1591
1682 dict->SetString("host_port_pair", host_port_pair().ToString()); 1592 dict->SetString("host_port_pair", host_port_pair().ToString());
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
1771 scoped_ptr<SpdyBufferProducer>( 1681 scoped_ptr<SpdyBufferProducer>(
1772 new SimpleBufferProducer( 1682 new SimpleBufferProducer(
1773 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1683 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1774 base::WeakPtr<SpdyStream>()); 1684 base::WeakPtr<SpdyStream>());
1775 } 1685 }
1776 1686
1777 void SpdySession::EnqueueWrite(RequestPriority priority, 1687 void SpdySession::EnqueueWrite(RequestPriority priority,
1778 SpdyFrameType frame_type, 1688 SpdyFrameType frame_type,
1779 scoped_ptr<SpdyBufferProducer> producer, 1689 scoped_ptr<SpdyBufferProducer> producer,
1780 const base::WeakPtr<SpdyStream>& stream) { 1690 const base::WeakPtr<SpdyStream>& stream) {
1781 if (availability_state_ == STATE_CLOSED) 1691 if (availability_state_ == STATE_DRAINING)
1782 return; 1692 return;
1783 1693
1784 bool was_idle = write_queue_.IsEmpty();
1785 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1694 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1695 MaybePostWriteLoop();
1696 }
1697
1698 void SpdySession::MaybePostWriteLoop() {
1786 if (write_state_ == WRITE_STATE_IDLE) { 1699 if (write_state_ == WRITE_STATE_IDLE) {
1787 DCHECK(was_idle); 1700 CHECK(!in_flight_write_);
1788 DCHECK(!in_flight_write_);
1789 write_state_ = WRITE_STATE_DO_WRITE; 1701 write_state_ = WRITE_STATE_DO_WRITE;
1790 base::MessageLoop::current()->PostTask( 1702 base::MessageLoop::current()->PostTask(
1791 FROM_HERE, 1703 FROM_HERE,
1792 base::Bind(&SpdySession::PumpWriteLoop, 1704 base::Bind(&SpdySession::PumpWriteLoop,
1793 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1705 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1794 } 1706 }
1795 } 1707 }
1796 1708
1797 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1709 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1798 CHECK_EQ(stream->stream_id(), 0u); 1710 CHECK_EQ(stream->stream_id(), 0u);
(...skipping 23 matching lines...) Expand all
1822 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1734 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1823 if (in_flight_write_stream_.get() == stream.get()) { 1735 if (in_flight_write_stream_.get() == stream.get()) {
1824 // If we're deleting the stream for the in-flight write, we still 1736 // If we're deleting the stream for the in-flight write, we still
1825 // need to let the write complete, so we clear 1737 // need to let the write complete, so we clear
1826 // |in_flight_write_stream_| and let the write finish on its own 1738 // |in_flight_write_stream_| and let the write finish on its own
1827 // without notifying |in_flight_write_stream_|. 1739 // without notifying |in_flight_write_stream_|.
1828 in_flight_write_stream_.reset(); 1740 in_flight_write_stream_.reset();
1829 } 1741 }
1830 1742
1831 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1743 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1832
1833 // |stream->OnClose()| may end up closing |this|, so detect that.
1834 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1835
1836 stream->OnClose(status); 1744 stream->OnClose(status);
1837 1745
1838 if (!weak_this) 1746 if (availability_state_ == STATE_AVAILABLE) {
1839 return; 1747 ProcessPendingStreamRequests();
1840
1841 switch (availability_state_) {
1842 case STATE_AVAILABLE:
1843 ProcessPendingStreamRequests();
1844 break;
1845 case STATE_GOING_AWAY:
1846 DcheckGoingAway();
1847 MaybeFinishGoingAway();
1848 break;
1849 case STATE_CLOSED:
1850 // Do nothing.
1851 break;
1852 } 1748 }
1853 } 1749 }
1854 1750
1855 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1751 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1856 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1752 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1857 1753
1858 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1754 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1859 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1755 if (unclaimed_it == unclaimed_pushed_streams_.end())
1860 return base::WeakPtr<SpdyStream>(); 1756 return base::WeakPtr<SpdyStream>();
1861 1757
(...skipping 23 matching lines...) Expand all
1885 SSLCertRequestInfo* cert_request_info) { 1781 SSLCertRequestInfo* cert_request_info) {
1886 if (!is_secure_) 1782 if (!is_secure_)
1887 return false; 1783 return false;
1888 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1784 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1889 return true; 1785 return true;
1890 } 1786 }
1891 1787
1892 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1788 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1893 CHECK(in_io_loop_); 1789 CHECK(in_io_loop_);
1894 1790
1895 if (availability_state_ == STATE_CLOSED)
1896 return;
1897
1898 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code)); 1791 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1899 std::string description = base::StringPrintf( 1792 std::string description = base::StringPrintf(
1900 "SPDY_ERROR error_code: %d.", error_code); 1793 "SPDY_ERROR error_code: %d.", error_code);
1901 CloseSessionResult result = 1794 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, description);
1902 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
1903 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
1904 } 1795 }
1905 1796
1906 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1797 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1907 const std::string& description) { 1798 const std::string& description) {
1908 CHECK(in_io_loop_); 1799 CHECK(in_io_loop_);
1909 1800
1910 if (availability_state_ == STATE_CLOSED)
1911 return;
1912
1913 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1801 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1914 if (it == active_streams_.end()) { 1802 if (it == active_streams_.end()) {
1915 // We still want to send a frame to reset the stream even if we 1803 // We still want to send a frame to reset the stream even if we
1916 // don't know anything about it. 1804 // don't know anything about it.
1917 EnqueueResetStreamFrame( 1805 EnqueueResetStreamFrame(
1918 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1806 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1919 return; 1807 return;
1920 } 1808 }
1921 1809
1922 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1810 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1923 } 1811 }
1924 1812
1925 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, 1813 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1926 size_t length, 1814 size_t length,
1927 bool fin) { 1815 bool fin) {
1928 CHECK(in_io_loop_); 1816 CHECK(in_io_loop_);
1929 1817
1930 if (availability_state_ == STATE_CLOSED)
1931 return;
1932
1933 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1818 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1934 1819
1935 // By the time data comes in, the stream may already be inactive. 1820 // By the time data comes in, the stream may already be inactive.
1936 if (it == active_streams_.end()) 1821 if (it == active_streams_.end())
1937 return; 1822 return;
1938 1823
1939 SpdyStream* stream = it->second.stream; 1824 SpdyStream* stream = it->second.stream;
1940 CHECK_EQ(stream->stream_id(), stream_id); 1825 CHECK_EQ(stream->stream_id(), stream_id);
1941 1826
1942 DCHECK(buffered_spdy_framer_); 1827 DCHECK(buffered_spdy_framer_);
1943 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize(); 1828 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1944 stream->IncrementRawReceivedBytes(header_len); 1829 stream->IncrementRawReceivedBytes(header_len);
1945 } 1830 }
1946 1831
1947 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1832 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1948 const char* data, 1833 const char* data,
1949 size_t len, 1834 size_t len,
1950 bool fin) { 1835 bool fin) {
1951 CHECK(in_io_loop_); 1836 CHECK(in_io_loop_);
1952 1837
1953 if (availability_state_ == STATE_CLOSED)
1954 return;
1955
1956 if (data == NULL && len != 0) { 1838 if (data == NULL && len != 0) {
1957 // This is notification of consumed data padding. 1839 // This is notification of consumed data padding.
1958 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames. 1840 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
1959 // See crbug.com/353012. 1841 // See crbug.com/353012.
1960 return; 1842 return;
1961 } 1843 }
1962 1844
1963 DCHECK_LT(len, 1u << 24); 1845 DCHECK_LT(len, 1u << 24);
1964 if (net_log().IsLogging()) { 1846 if (net_log().IsLogging()) {
1965 net_log().AddEvent( 1847 net_log().AddEvent(
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
2005 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 1887 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2006 return; 1888 return;
2007 } 1889 }
2008 1890
2009 stream->OnDataReceived(buffer.Pass()); 1891 stream->OnDataReceived(buffer.Pass());
2010 } 1892 }
2011 1893
2012 void SpdySession::OnSettings(bool clear_persisted) { 1894 void SpdySession::OnSettings(bool clear_persisted) {
2013 CHECK(in_io_loop_); 1895 CHECK(in_io_loop_);
2014 1896
2015 if (availability_state_ == STATE_CLOSED)
2016 return;
2017
2018 if (clear_persisted) 1897 if (clear_persisted)
2019 http_server_properties_->ClearSpdySettings(host_port_pair()); 1898 http_server_properties_->ClearSpdySettings(host_port_pair());
2020 1899
2021 if (net_log_.IsLogging()) { 1900 if (net_log_.IsLogging()) {
2022 net_log_.AddEvent( 1901 net_log_.AddEvent(
2023 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1902 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2024 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 1903 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2025 clear_persisted)); 1904 clear_persisted));
2026 } 1905 }
2027 1906
2028 if (GetProtocolVersion() >= SPDY4) { 1907 if (GetProtocolVersion() >= SPDY4) {
2029 // Send an acknowledgment of the setting. 1908 // Send an acknowledgment of the setting.
2030 SpdySettingsIR settings_ir; 1909 SpdySettingsIR settings_ir;
2031 settings_ir.set_is_ack(true); 1910 settings_ir.set_is_ack(true);
2032 EnqueueSessionWrite( 1911 EnqueueSessionWrite(
2033 HIGHEST, 1912 HIGHEST,
2034 SETTINGS, 1913 SETTINGS,
2035 scoped_ptr<SpdyFrame>( 1914 scoped_ptr<SpdyFrame>(
2036 buffered_spdy_framer_->SerializeFrame(settings_ir))); 1915 buffered_spdy_framer_->SerializeFrame(settings_ir)));
2037 } 1916 }
2038 } 1917 }
2039 1918
2040 void SpdySession::OnSetting(SpdySettingsIds id, 1919 void SpdySession::OnSetting(SpdySettingsIds id,
2041 uint8 flags, 1920 uint8 flags,
2042 uint32 value) { 1921 uint32 value) {
2043 CHECK(in_io_loop_); 1922 CHECK(in_io_loop_);
2044 1923
2045 if (availability_state_ == STATE_CLOSED)
2046 return;
2047
2048 HandleSetting(id, value); 1924 HandleSetting(id, value);
2049 http_server_properties_->SetSpdySetting( 1925 http_server_properties_->SetSpdySetting(
2050 host_port_pair(), 1926 host_port_pair(),
2051 id, 1927 id,
2052 static_cast<SpdySettingsFlags>(flags), 1928 static_cast<SpdySettingsFlags>(flags),
2053 value); 1929 value);
2054 received_settings_ = true; 1930 received_settings_ = true;
2055 1931
2056 // Log the setting. 1932 // Log the setting.
2057 net_log_.AddEvent( 1933 net_log_.AddEvent(
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
2105 } 1981 }
2106 1982
2107 void SpdySession::OnSynStream(SpdyStreamId stream_id, 1983 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2108 SpdyStreamId associated_stream_id, 1984 SpdyStreamId associated_stream_id,
2109 SpdyPriority priority, 1985 SpdyPriority priority,
2110 bool fin, 1986 bool fin,
2111 bool unidirectional, 1987 bool unidirectional,
2112 const SpdyHeaderBlock& headers) { 1988 const SpdyHeaderBlock& headers) {
2113 CHECK(in_io_loop_); 1989 CHECK(in_io_loop_);
2114 1990
2115 if (availability_state_ == STATE_CLOSED)
2116 return;
2117
2118 base::Time response_time = base::Time::Now(); 1991 base::Time response_time = base::Time::Now();
2119 base::TimeTicks recv_first_byte_time = time_func_(); 1992 base::TimeTicks recv_first_byte_time = time_func_();
2120 1993
2121 if (net_log_.IsLogging()) { 1994 if (net_log_.IsLogging()) {
2122 net_log_.AddEvent( 1995 net_log_.AddEvent(
2123 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1996 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2124 base::Bind(&NetLogSpdySynStreamReceivedCallback, 1997 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2125 &headers, fin, unidirectional, priority, 1998 &headers, fin, unidirectional, priority,
2126 stream_id, associated_stream_id)); 1999 stream_id, associated_stream_id));
2127 } 2000 }
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after
2293 2166
2294 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2167 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2295 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2168 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2296 } 2169 }
2297 2170
2298 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2171 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2299 bool fin, 2172 bool fin,
2300 const SpdyHeaderBlock& headers) { 2173 const SpdyHeaderBlock& headers) {
2301 CHECK(in_io_loop_); 2174 CHECK(in_io_loop_);
2302 2175
2303 if (availability_state_ == STATE_CLOSED)
2304 return;
2305
2306 base::Time response_time = base::Time::Now(); 2176 base::Time response_time = base::Time::Now();
2307 base::TimeTicks recv_first_byte_time = time_func_(); 2177 base::TimeTicks recv_first_byte_time = time_func_();
2308 2178
2309 if (net_log().IsLogging()) { 2179 if (net_log().IsLogging()) {
2310 net_log().AddEvent( 2180 net_log().AddEvent(
2311 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2181 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2312 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2182 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2313 &headers, fin, stream_id)); 2183 &headers, fin, stream_id));
2314 } 2184 }
2315 2185
(...skipping 27 matching lines...) Expand all
2343 2213
2344 ignore_result(OnInitialResponseHeadersReceived( 2214 ignore_result(OnInitialResponseHeadersReceived(
2345 headers, response_time, recv_first_byte_time, stream)); 2215 headers, response_time, recv_first_byte_time, stream));
2346 } 2216 }
2347 2217
2348 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2218 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2349 bool fin, 2219 bool fin,
2350 const SpdyHeaderBlock& headers) { 2220 const SpdyHeaderBlock& headers) {
2351 CHECK(in_io_loop_); 2221 CHECK(in_io_loop_);
2352 2222
2353 if (availability_state_ == STATE_CLOSED)
2354 return;
2355
2356 if (net_log().IsLogging()) { 2223 if (net_log().IsLogging()) {
2357 net_log().AddEvent( 2224 net_log().AddEvent(
2358 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2225 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2359 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2226 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2360 &headers, fin, stream_id)); 2227 &headers, fin, stream_id));
2361 } 2228 }
2362 2229
2363 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2230 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2364 if (it == active_streams_.end()) { 2231 if (it == active_streams_.end()) {
2365 // NOTE: it may just be that the stream was cancelled. 2232 // NOTE: it may just be that the stream was cancelled.
(...skipping 27 matching lines...) Expand all
2393 DCHECK_NE(rv, ERR_IO_PENDING); 2260 DCHECK_NE(rv, ERR_IO_PENDING);
2394 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2261 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2395 } 2262 }
2396 } 2263 }
2397 } 2264 }
2398 2265
2399 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2266 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2400 SpdyRstStreamStatus status) { 2267 SpdyRstStreamStatus status) {
2401 CHECK(in_io_loop_); 2268 CHECK(in_io_loop_);
2402 2269
2403 if (availability_state_ == STATE_CLOSED)
2404 return;
2405
2406 std::string description; 2270 std::string description;
2407 net_log().AddEvent( 2271 net_log().AddEvent(
2408 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2272 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2409 base::Bind(&NetLogSpdyRstCallback, 2273 base::Bind(&NetLogSpdyRstCallback,
2410 stream_id, status, &description)); 2274 stream_id, status, &description));
2411 2275
2412 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2276 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2413 if (it == active_streams_.end()) { 2277 if (it == active_streams_.end()) {
2414 // NOTE: it may just be that the stream was cancelled. 2278 // NOTE: it may just be that the stream was cancelled.
2415 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2279 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
(...skipping 15 matching lines...) Expand all
2431 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2295 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2432 // For now, it doesn't matter much - it is a protocol error. 2296 // For now, it doesn't matter much - it is a protocol error.
2433 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2297 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2434 } 2298 }
2435 } 2299 }
2436 2300
2437 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2301 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2438 SpdyGoAwayStatus status) { 2302 SpdyGoAwayStatus status) {
2439 CHECK(in_io_loop_); 2303 CHECK(in_io_loop_);
2440 2304
2441 if (availability_state_ == STATE_CLOSED)
2442 return;
2443
2444 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2305 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2445 base::Bind(&NetLogSpdyGoAwayCallback, 2306 base::Bind(&NetLogSpdyGoAwayCallback,
2446 last_accepted_stream_id, 2307 last_accepted_stream_id,
2447 active_streams_.size(), 2308 active_streams_.size(),
2448 unclaimed_pushed_streams_.size(), 2309 unclaimed_pushed_streams_.size(),
2449 status)); 2310 status));
2450 MakeUnavailable(); 2311 MakeUnavailable();
2451 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2312 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2452 // This is to handle the case when we already don't have any active 2313 // This is to handle the case when we already don't have any active
2453 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2314 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2454 // active streams and so the last one being closed will finish the 2315 // active streams and so the last one being closed will finish the
2455 // going away process (see DeleteStream()). 2316 // going away process (see DeleteStream()).
2456 MaybeFinishGoingAway(); 2317 MaybeFinishGoingAway();
2457 } 2318 }
2458 2319
2459 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { 2320 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2460 CHECK(in_io_loop_); 2321 CHECK(in_io_loop_);
2461 2322
2462 if (availability_state_ == STATE_CLOSED)
2463 return;
2464
2465 net_log_.AddEvent( 2323 net_log_.AddEvent(
2466 NetLog::TYPE_SPDY_SESSION_PING, 2324 NetLog::TYPE_SPDY_SESSION_PING,
2467 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received")); 2325 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2468 2326
2469 // Send response to a PING from server. 2327 // Send response to a PING from server.
2470 if ((protocol_ >= kProtoSPDY4 && !is_ack) || 2328 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2471 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) { 2329 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2472 WritePingFrame(unique_id, true); 2330 WritePingFrame(unique_id, true);
2473 return; 2331 return;
2474 } 2332 }
2475 2333
2476 --pings_in_flight_; 2334 --pings_in_flight_;
2477 if (pings_in_flight_ < 0) { 2335 if (pings_in_flight_ < 0) {
2478 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2336 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2479 CloseSessionResult result = 2337 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2480 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2481 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2482 pings_in_flight_ = 0; 2338 pings_in_flight_ = 0;
2483 return; 2339 return;
2484 } 2340 }
2485 2341
2486 if (pings_in_flight_ > 0) 2342 if (pings_in_flight_ > 0)
2487 return; 2343 return;
2488 2344
2489 // We will record RTT in histogram when there are no more client sent 2345 // We will record RTT in histogram when there are no more client sent
2490 // pings_in_flight_. 2346 // pings_in_flight_.
2491 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2347 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2492 } 2348 }
2493 2349
2494 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2350 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2495 uint32 delta_window_size) { 2351 uint32 delta_window_size) {
2496 CHECK(in_io_loop_); 2352 CHECK(in_io_loop_);
2497 2353
2498 if (availability_state_ == STATE_CLOSED)
2499 return;
2500
2501 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2354 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2502 net_log_.AddEvent( 2355 net_log_.AddEvent(
2503 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2356 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2504 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2357 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2505 stream_id, delta_window_size)); 2358 stream_id, delta_window_size));
2506 2359
2507 if (stream_id == kSessionFlowControlStreamId) { 2360 if (stream_id == kSessionFlowControlStreamId) {
2508 // WINDOW_UPDATE for the session. 2361 // WINDOW_UPDATE for the session.
2509 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2362 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2510 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2363 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2511 << "session flow control is not turned on"; 2364 << "session flow control is not turned on";
2512 // TODO(akalin): Record an error and close the session. 2365 // TODO(akalin): Record an error and close the session.
2513 return; 2366 return;
2514 } 2367 }
2515 2368
2516 if (delta_window_size < 1u) { 2369 if (delta_window_size < 1u) {
2517 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2370 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2518 CloseSessionResult result = DoCloseSession( 2371 DoDrainSession(
2519 ERR_SPDY_PROTOCOL_ERROR, 2372 ERR_SPDY_PROTOCOL_ERROR,
2520 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2373 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2521 base::UintToString(delta_window_size)); 2374 base::UintToString(delta_window_size));
2522 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2523 return; 2375 return;
2524 } 2376 }
2525 2377
2526 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2378 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2527 } else { 2379 } else {
2528 // WINDOW_UPDATE for a stream. 2380 // WINDOW_UPDATE for a stream.
2529 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2381 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2530 // TODO(akalin): Record an error and close the session. 2382 // TODO(akalin): Record an error and close the session.
2531 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2383 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2532 << " when flow control is not turned on"; 2384 << " when flow control is not turned on";
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
2569 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2421 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2570 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2422 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2571 CHECK(it != active_streams_.end()); 2423 CHECK(it != active_streams_.end());
2572 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2424 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2573 SendWindowUpdateFrame( 2425 SendWindowUpdateFrame(
2574 stream_id, delta_window_size, it->second.stream->priority()); 2426 stream_id, delta_window_size, it->second.stream->priority());
2575 } 2427 }
2576 2428
2577 void SpdySession::SendInitialData() { 2429 void SpdySession::SendInitialData() {
2578 DCHECK(enable_sending_initial_data_); 2430 DCHECK(enable_sending_initial_data_);
2579 DCHECK_NE(availability_state_, STATE_CLOSED);
2580 2431
2581 if (send_connection_header_prefix_) { 2432 if (send_connection_header_prefix_) {
2582 DCHECK_EQ(protocol_, kProtoSPDY4); 2433 DCHECK_EQ(protocol_, kProtoSPDY4);
2583 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2434 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2584 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2435 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2585 kHttp2ConnectionHeaderPrefixSize, 2436 kHttp2ConnectionHeaderPrefixSize,
2586 false /* take_ownership */)); 2437 false /* take_ownership */));
2587 // Count the prefix as part of the subsequent SETTINGS frame. 2438 // Count the prefix as part of the subsequent SETTINGS frame.
2588 EnqueueSessionWrite(HIGHEST, SETTINGS, 2439 EnqueueSessionWrite(HIGHEST, SETTINGS,
2589 connection_header_prefix_frame.Pass()); 2440 connection_header_prefix_frame.Pass());
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
2635 const SpdySettingsIds new_id = it->first; 2486 const SpdySettingsIds new_id = it->first;
2636 const uint32 new_val = it->second.second; 2487 const uint32 new_val = it->second.second;
2637 HandleSetting(new_id, new_val); 2488 HandleSetting(new_id, new_val);
2638 } 2489 }
2639 2490
2640 SendSettings(server_settings_map); 2491 SendSettings(server_settings_map);
2641 } 2492 }
2642 2493
2643 2494
2644 void SpdySession::SendSettings(const SettingsMap& settings) { 2495 void SpdySession::SendSettings(const SettingsMap& settings) {
2645 DCHECK_NE(availability_state_, STATE_CLOSED);
2646
2647 net_log_.AddEvent( 2496 net_log_.AddEvent(
2648 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2497 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2649 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2498 base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2650 2499
2651 // Create the SETTINGS frame and send it. 2500 // Create the SETTINGS frame and send it.
2652 DCHECK(buffered_spdy_framer_.get()); 2501 DCHECK(buffered_spdy_framer_.get());
2653 scoped_ptr<SpdyFrame> settings_frame( 2502 scoped_ptr<SpdyFrame> settings_frame(
2654 buffered_spdy_framer_->CreateSettings(settings)); 2503 buffered_spdy_framer_->CreateSettings(settings));
2655 sent_settings_ = true; 2504 sent_settings_ = true;
2656 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2505 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
2765 2614
2766 check_ping_status_pending_ = true; 2615 check_ping_status_pending_ = true;
2767 base::MessageLoop::current()->PostDelayedTask( 2616 base::MessageLoop::current()->PostDelayedTask(
2768 FROM_HERE, 2617 FROM_HERE,
2769 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2618 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2770 time_func_()), hung_interval_); 2619 time_func_()), hung_interval_);
2771 } 2620 }
2772 2621
2773 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2622 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2774 CHECK(!in_io_loop_); 2623 CHECK(!in_io_loop_);
2775 DCHECK_NE(availability_state_, STATE_CLOSED);
2776 2624
2777 // Check if we got a response back for all PINGs we had sent. 2625 // Check if we got a response back for all PINGs we had sent.
2778 if (pings_in_flight_ == 0) { 2626 if (pings_in_flight_ == 0) {
2779 check_ping_status_pending_ = false; 2627 check_ping_status_pending_ = false;
2780 return; 2628 return;
2781 } 2629 }
2782 2630
2783 DCHECK(check_ping_status_pending_); 2631 DCHECK(check_ping_status_pending_);
2784 2632
2785 base::TimeTicks now = time_func_(); 2633 base::TimeTicks now = time_func_();
2786 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2634 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2787 2635
2788 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2636 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2789 // Track all failed PING messages in a separate bucket. 2637 // Track all failed PING messages in a separate bucket.
2790 RecordPingRTTHistogram(base::TimeDelta::Max()); 2638 RecordPingRTTHistogram(base::TimeDelta::Max());
2791 CloseSessionResult result = 2639 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2792 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2793 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
2794 return; 2640 return;
2795 } 2641 }
2796 2642
2797 // Check the status of connection after a delay. 2643 // Check the status of connection after a delay.
2798 base::MessageLoop::current()->PostDelayedTask( 2644 base::MessageLoop::current()->PostDelayedTask(
2799 FROM_HERE, 2645 FROM_HERE,
2800 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2646 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2801 now), 2647 now),
2802 delay); 2648 delay);
2803 } 2649 }
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
2916 return ssl_socket; 2762 return ssl_socket;
2917 } 2763 }
2918 2764
2919 void SpdySession::OnWriteBufferConsumed( 2765 void SpdySession::OnWriteBufferConsumed(
2920 size_t frame_payload_size, 2766 size_t frame_payload_size,
2921 size_t consume_size, 2767 size_t consume_size,
2922 SpdyBuffer::ConsumeSource consume_source) { 2768 SpdyBuffer::ConsumeSource consume_source) {
2923 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2769 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2924 // deleted (e.g., a stream is closed due to incoming data). 2770 // deleted (e.g., a stream is closed due to incoming data).
2925 2771
2926 if (availability_state_ == STATE_CLOSED)
2927 return;
2928
2929 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2772 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2930 2773
2931 if (consume_source == SpdyBuffer::DISCARD) { 2774 if (consume_source == SpdyBuffer::DISCARD) {
2932 // If we're discarding a frame or part of it, increase the send 2775 // If we're discarding a frame or part of it, increase the send
2933 // window by the number of discarded bytes. (Although if we're 2776 // window by the number of discarded bytes. (Although if we're
2934 // discarding part of a frame, it's probably because of a write 2777 // discarding part of a frame, it's probably because of a write
2935 // error and we'll be tearing down the session soon.) 2778 // error and we'll be tearing down the session soon.)
2936 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2779 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2937 DCHECK_GT(remaining_payload_bytes, 0u); 2780 DCHECK_GT(remaining_payload_bytes, 0u);
2938 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2781 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2939 } 2782 }
2940 // For consumed bytes, the send window is increased when we receive 2783 // For consumed bytes, the send window is increased when we receive
2941 // a WINDOW_UPDATE frame. 2784 // a WINDOW_UPDATE frame.
2942 } 2785 }
2943 2786
2944 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2787 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2945 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2788 // We can be called with |in_io_loop_| set if a SpdyBuffer is
2946 // deleted (e.g., a stream is closed due to incoming data). 2789 // deleted (e.g., a stream is closed due to incoming data).
2947 2790
2948 DCHECK_NE(availability_state_, STATE_CLOSED);
2949 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2791 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2950 DCHECK_GE(delta_window_size, 1); 2792 DCHECK_GE(delta_window_size, 1);
2951 2793
2952 // Check for overflow. 2794 // Check for overflow.
2953 int32 max_delta_window_size = kint32max - session_send_window_size_; 2795 int32 max_delta_window_size = kint32max - session_send_window_size_;
2954 if (delta_window_size > max_delta_window_size) { 2796 if (delta_window_size > max_delta_window_size) {
2955 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2797 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2956 CloseSessionResult result = DoCloseSession( 2798 DoDrainSession(
2957 ERR_SPDY_PROTOCOL_ERROR, 2799 ERR_SPDY_PROTOCOL_ERROR,
2958 "Received WINDOW_UPDATE [delta: " + 2800 "Received WINDOW_UPDATE [delta: " +
2959 base::IntToString(delta_window_size) + 2801 base::IntToString(delta_window_size) +
2960 "] for session overflows session_send_window_size_ [current: " + 2802 "] for session overflows session_send_window_size_ [current: " +
2961 base::IntToString(session_send_window_size_) + "]"); 2803 base::IntToString(session_send_window_size_) + "]");
2962 DCHECK_NE(result, SESSION_ALREADY_CLOSED);
2963 return; 2804 return;
2964 } 2805 }
2965 2806
2966 session_send_window_size_ += delta_window_size; 2807 session_send_window_size_ += delta_window_size;
2967 2808
2968 net_log_.AddEvent( 2809 net_log_.AddEvent(
2969 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2810 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2970 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2811 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2971 delta_window_size, session_send_window_size_)); 2812 delta_window_size, session_send_window_size_));
2972 2813
2973 DCHECK(!IsSendStalled()); 2814 DCHECK(!IsSendStalled());
2974 ResumeSendStalledStreams(); 2815 ResumeSendStalledStreams();
2975 } 2816 }
2976 2817
2977 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 2818 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2978 DCHECK_NE(availability_state_, STATE_CLOSED);
2979 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2819 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2980 2820
2981 // We only call this method when sending a frame. Therefore, 2821 // We only call this method when sending a frame. Therefore,
2982 // |delta_window_size| should be within the valid frame size range. 2822 // |delta_window_size| should be within the valid frame size range.
2983 DCHECK_GE(delta_window_size, 1); 2823 DCHECK_GE(delta_window_size, 1);
2984 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 2824 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
2985 2825
2986 // |send_window_size_| should have been at least |delta_window_size| for 2826 // |send_window_size_| should have been at least |delta_window_size| for
2987 // this call to happen. 2827 // this call to happen.
2988 DCHECK_GE(session_send_window_size_, delta_window_size); 2828 DCHECK_GE(session_send_window_size_, delta_window_size);
2989 2829
2990 session_send_window_size_ -= delta_window_size; 2830 session_send_window_size_ -= delta_window_size;
2991 2831
2992 net_log_.AddEvent( 2832 net_log_.AddEvent(
2993 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2833 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2994 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2834 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2995 -delta_window_size, session_send_window_size_)); 2835 -delta_window_size, session_send_window_size_));
2996 } 2836 }
2997 2837
2998 void SpdySession::OnReadBufferConsumed( 2838 void SpdySession::OnReadBufferConsumed(
2999 size_t consume_size, 2839 size_t consume_size,
3000 SpdyBuffer::ConsumeSource consume_source) { 2840 SpdyBuffer::ConsumeSource consume_source) {
3001 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 2841 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3002 // deleted (e.g., discarded by a SpdyReadQueue). 2842 // deleted (e.g., discarded by a SpdyReadQueue).
3003 2843
3004 if (availability_state_ == STATE_CLOSED)
3005 return;
3006
3007 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2844 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3008 DCHECK_GE(consume_size, 1u); 2845 DCHECK_GE(consume_size, 1u);
3009 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 2846 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3010 2847
3011 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 2848 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3012 } 2849 }
3013 2850
3014 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 2851 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3015 DCHECK_NE(availability_state_, STATE_CLOSED);
3016 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2852 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3017 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 2853 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3018 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 2854 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3019 DCHECK_GE(delta_window_size, 1); 2855 DCHECK_GE(delta_window_size, 1);
3020 // Check for overflow. 2856 // Check for overflow.
3021 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 2857 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3022 2858
3023 session_recv_window_size_ += delta_window_size; 2859 session_recv_window_size_ += delta_window_size;
3024 net_log_.AddEvent( 2860 net_log_.AddEvent(
3025 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 2861 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
(...skipping 12 matching lines...) Expand all
3038 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 2874 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3039 CHECK(in_io_loop_); 2875 CHECK(in_io_loop_);
3040 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2876 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3041 DCHECK_GE(delta_window_size, 1); 2877 DCHECK_GE(delta_window_size, 1);
3042 2878
3043 // Since we never decrease the initial receive window size, 2879 // Since we never decrease the initial receive window size,
3044 // |delta_window_size| should never cause |recv_window_size_| to go 2880 // |delta_window_size| should never cause |recv_window_size_| to go
3045 // negative. If we do, the receive window isn't being respected. 2881 // negative. If we do, the receive window isn't being respected.
3046 if (delta_window_size > session_recv_window_size_) { 2882 if (delta_window_size > session_recv_window_size_) {
3047 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 2883 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3048 CloseSessionResult result = DoCloseSession( 2884 DoDrainSession(
3049 ERR_SPDY_PROTOCOL_ERROR, 2885 ERR_SPDY_PROTOCOL_ERROR,
3050 "delta_window_size is " + base::IntToString(delta_window_size) + 2886 "delta_window_size is " + base::IntToString(delta_window_size) +
3051 " in DecreaseRecvWindowSize, which is larger than the receive " + 2887 " in DecreaseRecvWindowSize, which is larger than the receive " +
3052 "window size of " + base::IntToString(session_recv_window_size_)); 2888 "window size of " + base::IntToString(session_recv_window_size_));
3053 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
3054 return; 2889 return;
3055 } 2890 }
3056 2891
3057 session_recv_window_size_ -= delta_window_size; 2892 session_recv_window_size_ -= delta_window_size;
3058 net_log_.AddEvent( 2893 net_log_.AddEvent(
3059 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2894 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3060 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2895 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3061 -delta_window_size, session_recv_window_size_)); 2896 -delta_window_size, session_recv_window_size_));
3062 } 2897 }
3063 2898
3064 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 2899 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3065 DCHECK(stream.send_stalled_by_flow_control()); 2900 DCHECK(stream.send_stalled_by_flow_control());
3066 RequestPriority priority = stream.priority(); 2901 RequestPriority priority = stream.priority();
3067 CHECK_GE(priority, MINIMUM_PRIORITY); 2902 CHECK_GE(priority, MINIMUM_PRIORITY);
3068 CHECK_LE(priority, MAXIMUM_PRIORITY); 2903 CHECK_LE(priority, MAXIMUM_PRIORITY);
3069 stream_send_unstall_queue_[priority].push_back(stream.stream_id()); 2904 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3070 } 2905 }
3071 2906
3072 void SpdySession::ResumeSendStalledStreams() { 2907 void SpdySession::ResumeSendStalledStreams() {
3073 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2908 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3074 2909
3075 // We don't have to worry about new streams being queued, since 2910 // We don't have to worry about new streams being queued, since
3076 // doing so would cause IsSendStalled() to return true. But we do 2911 // doing so would cause IsSendStalled() to return true. But we do
3077 // have to worry about streams being closed, as well as ourselves 2912 // have to worry about streams being closed, as well as ourselves
3078 // being closed. 2913 // being closed.
3079 2914
3080 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { 2915 while (!IsSendStalled()) {
3081 size_t old_size = 0; 2916 size_t old_size = 0;
3082 #if DCHECK_IS_ON 2917 #if DCHECK_IS_ON
3083 old_size = GetTotalSize(stream_send_unstall_queue_); 2918 old_size = GetTotalSize(stream_send_unstall_queue_);
3084 #endif 2919 #endif
3085 2920
3086 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 2921 SpdyStreamId stream_id = PopStreamToPossiblyResume();
3087 if (stream_id == 0) 2922 if (stream_id == 0)
3088 break; 2923 break;
3089 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2924 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3090 // The stream may actually still be send-stalled after this (due 2925 // The stream may actually still be send-stalled after this (due
(...skipping 14 matching lines...) Expand all
3105 if (!queue->empty()) { 2940 if (!queue->empty()) {
3106 SpdyStreamId stream_id = queue->front(); 2941 SpdyStreamId stream_id = queue->front();
3107 queue->pop_front(); 2942 queue->pop_front();
3108 return stream_id; 2943 return stream_id;
3109 } 2944 }
3110 } 2945 }
3111 return 0; 2946 return 0;
3112 } 2947 }
3113 2948
3114 } // namespace net 2949 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_pool.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698