Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(705)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 10448083: Fix out of order SYN_STEAM frames. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Cleanup Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/basictypes.h" 9 #include "base/basictypes.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
(...skipping 455 matching lines...) Expand 10 before | Expand all | Expand 10 after
466 466
467 SSLInfo ssl_info; 467 SSLInfo ssl_info;
468 bool was_npn_negotiated; 468 bool was_npn_negotiated;
469 NextProto protocol_negotiated = kProtoUnknown; 469 NextProto protocol_negotiated = kProtoUnknown;
470 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 470 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
471 return true; // This is not a secure session, so all domains are okay. 471 return true; // This is not a secure session, so all domains are okay.
472 472
473 return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain); 473 return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain);
474 } 474 }
475 475
476 void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream) {
477 write_queue_.push(stream);
478 WriteSocketLater();
479 }
480
476 int SpdySession::GetPushStream( 481 int SpdySession::GetPushStream(
477 const GURL& url, 482 const GURL& url,
478 scoped_refptr<SpdyStream>* stream, 483 scoped_refptr<SpdyStream>* stream,
479 const BoundNetLog& stream_net_log) { 484 const BoundNetLog& stream_net_log) {
480 CHECK_NE(state_, CLOSED); 485 CHECK_NE(state_, CLOSED);
481 486
482 *stream = NULL; 487 *stream = NULL;
483 488
484 // Don't allow access to secure push streams over an unauthenticated, but 489 // Don't allow access to secure push streams over an unauthenticated, but
485 // encrypted SSL socket. 490 // encrypted SSL socket.
(...skipping 18 matching lines...) Expand all
504 return 0; 509 return 0;
505 } 510 }
506 511
507 int SpdySession::CreateStream( 512 int SpdySession::CreateStream(
508 const GURL& url, 513 const GURL& url,
509 RequestPriority priority, 514 RequestPriority priority,
510 scoped_refptr<SpdyStream>* spdy_stream, 515 scoped_refptr<SpdyStream>* spdy_stream,
511 const BoundNetLog& stream_net_log, 516 const BoundNetLog& stream_net_log,
512 const CompletionCallback& callback) { 517 const CompletionCallback& callback) {
513 if (!max_concurrent_streams_ || 518 if (!max_concurrent_streams_ ||
514 active_streams_.size() < max_concurrent_streams_) { 519 (active_streams_.size() + created_streams_.size()
520 < max_concurrent_streams_)) {
515 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); 521 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
516 } 522 }
517 523
518 stalled_streams_++; 524 stalled_streams_++;
519 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL); 525 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL);
520 create_stream_queues_[priority].push( 526 create_stream_queues_[priority].push(
521 PendingCreateStream(url, priority, spdy_stream, 527 PendingCreateStream(url, priority, spdy_stream,
522 stream_net_log, callback)); 528 stream_net_log, callback));
523 return ERR_IO_PENDING; 529 return ERR_IO_PENDING;
524 } 530 }
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
594 CloseSessionOnError( 600 CloseSessionOnError(
595 static_cast<net::Error>(certificate_error_code_), 601 static_cast<net::Error>(certificate_error_code_),
596 true, 602 true,
597 "Tried to create SPDY stream for secure content over an " 603 "Tried to create SPDY stream for secure content over an "
598 "unauthenticated session."); 604 "unauthenticated session.");
599 return ERR_SPDY_PROTOCOL_ERROR; 605 return ERR_SPDY_PROTOCOL_ERROR;
600 } 606 }
601 607
602 const std::string& path = url.PathForRequest(); 608 const std::string& path = url.PathForRequest();
603 609
604 const SpdyStreamId stream_id = GetNewStreamId();
605
606 *spdy_stream = new SpdyStream(this, 610 *spdy_stream = new SpdyStream(this,
607 stream_id,
608 false, 611 false,
609 stream_net_log); 612 stream_net_log);
610 const scoped_refptr<SpdyStream>& stream = *spdy_stream; 613 const scoped_refptr<SpdyStream>& stream = *spdy_stream;
611 614
612 stream->set_priority(priority); 615 stream->set_priority(priority);
613 stream->set_path(path); 616 stream->set_path(path);
614 stream->set_send_window_size(initial_send_window_size_); 617 stream->set_send_window_size(initial_send_window_size_);
615 stream->set_recv_window_size(initial_recv_window_size_); 618 stream->set_recv_window_size(initial_recv_window_size_);
616 ActivateStream(stream); 619 created_streams_.insert(stream);
617 620
618 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", 621 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
619 static_cast<int>(priority), 0, 10, 11); 622 static_cast<int>(priority), 0, 10, 11);
620 623
621 // TODO(mbelshe): Optimize memory allocations 624 // TODO(mbelshe): Optimize memory allocations
622 625
623 DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
624 return OK; 626 return OK;
625 } 627 }
626 628
627 bool SpdySession::NeedsCredentials() const { 629 bool SpdySession::NeedsCredentials() const {
628 if (!is_secure_) 630 if (!is_secure_)
629 return false; 631 return false;
630 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 632 SSLClientSocket* ssl_socket = GetSSLClientSocket();
631 if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3) 633 if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3)
632 return false; 634 return false;
633 return ssl_socket->WasDomainBoundCertSent(); 635 return ssl_socket->WasDomainBoundCertSent();
634 } 636 }
635 637
636 void SpdySession::AddPooledAlias(const HostPortProxyPair& alias) { 638 void SpdySession::AddPooledAlias(const HostPortProxyPair& alias) {
637 pooled_aliases_.insert(alias); 639 pooled_aliases_.insert(alias);
638 } 640 }
639 641
640 int SpdySession::GetProtocolVersion() const { 642 int SpdySession::GetProtocolVersion() const {
641 DCHECK(buffered_spdy_framer_.get()); 643 DCHECK(buffered_spdy_framer_.get());
642 return buffered_spdy_framer_->protocol_version(); 644 return buffered_spdy_framer_->protocol_version();
643 } 645 }
644 646
645 int SpdySession::WriteSynStream( 647 SpdySynStreamControlFrame* SpdySession::CreateSynStream(
646 SpdyStreamId stream_id, 648 SpdyStreamId stream_id,
647 RequestPriority priority, 649 RequestPriority priority,
648 uint8 credential_slot, 650 uint8 credential_slot,
649 SpdyControlFlags flags, 651 SpdyControlFlags flags,
650 const linked_ptr<SpdyHeaderBlock>& headers) { 652 const linked_ptr<SpdyHeaderBlock>& headers) {
651 // Find our stream 653 DCHECK(IsStreamActive(stream_id));
652 if (!IsStreamActive(stream_id))
653 return ERR_INVALID_SPDY_STREAM;
654 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; 654 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
655 CHECK_EQ(stream->stream_id(), stream_id); 655 CHECK_EQ(stream->stream_id(), stream_id);
656 656
657 SendPrefacePingIfNoneInFlight(); 657 SendPrefacePingIfNoneInFlight();
658 658
659 DCHECK(buffered_spdy_framer_.get()); 659 DCHECK(buffered_spdy_framer_.get());
660 scoped_ptr<SpdySynStreamControlFrame> syn_frame( 660 scoped_ptr<SpdySynStreamControlFrame> syn_frame(
661 buffered_spdy_framer_->CreateSynStream( 661 buffered_spdy_framer_->CreateSynStream(
662 stream_id, 0, 662 stream_id, 0,
663 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), 663 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
664 credential_slot, flags, false, headers.get())); 664 credential_slot, flags, true, headers.get()));
665 // We enqueue all SYN_STREAM frames at the same priority to ensure
666 // that we do not send them out-of-order.
667 // http://crbug.com/111708
668 QueueFrame(syn_frame.get(), HIGHEST, stream);
669 665
670 base::StatsCounter spdy_requests("spdy.requests"); 666 base::StatsCounter spdy_requests("spdy.requests");
671 spdy_requests.Increment(); 667 spdy_requests.Increment();
672 streams_initiated_count_++; 668 streams_initiated_count_++;
673 669
674 if (net_log().IsLoggingAllEvents()) { 670 if (net_log().IsLoggingAllEvents()) {
675 net_log().AddEvent( 671 net_log().AddEvent(
676 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 672 NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
677 make_scoped_refptr( 673 make_scoped_refptr(
678 new NetLogSpdySynParameter(headers, flags, stream_id, 0))); 674 new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
679 } 675 }
680 676
681 return ERR_IO_PENDING; 677 return syn_frame.release();
682 } 678 }
683 679
684 int SpdySession::WriteCredentialFrame(const std::string& origin, 680 SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
685 SSLClientCertType type, 681 const std::string& origin,
686 const std::string& key, 682 SSLClientCertType type,
687 const std::string& cert, 683 const std::string& key,
688 RequestPriority priority) { 684 const std::string& cert,
685 RequestPriority priority) {
689 DCHECK(is_secure_); 686 DCHECK(is_secure_);
690 unsigned char secret[32]; // 32 bytes from the spec 687 unsigned char secret[32]; // 32 bytes from the spec
691 GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", 688 GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
692 true, origin, 689 true, origin,
693 secret, arraysize(secret)); 690 secret, arraysize(secret));
694 691
695 // Convert the key string into a vector<unit8> 692 // Convert the key string into a vector<unit8>
696 std::vector<uint8> key_data; 693 std::vector<uint8> key_data;
697 for (size_t i = 0; i < key.length(); i++) { 694 for (size_t i = 0; i < key.length(); i++) {
698 key_data.push_back(key[i]); 695 key_data.push_back(key[i]);
(...skipping 21 matching lines...) Expand all
720 SpdyCredential credential; 717 SpdyCredential credential;
721 GURL origin_url(origin); 718 GURL origin_url(origin);
722 credential.slot = 719 credential.slot =
723 credential_state_.SetHasCredential(origin_url); 720 credential_state_.SetHasCredential(origin_url);
724 credential.certs.push_back(cert); 721 credential.certs.push_back(cert);
725 credential.proof.assign(proof.begin(), proof.end()); 722 credential.proof.assign(proof.begin(), proof.end());
726 723
727 DCHECK(buffered_spdy_framer_.get()); 724 DCHECK(buffered_spdy_framer_.get());
728 scoped_ptr<SpdyCredentialControlFrame> credential_frame( 725 scoped_ptr<SpdyCredentialControlFrame> credential_frame(
729 buffered_spdy_framer_->CreateCredentialFrame(credential)); 726 buffered_spdy_framer_->CreateCredentialFrame(credential));
730 // We enqueue all SYN_STREAM frames at the same priority to ensure
731 // that we do not send them out-of-order, which means that we need
732 // to enqueue all CREDENTIAL frames at this priority to ensure that
733 // they are sent *before* the SYN_STREAM that references them.
734 // http://crbug.com/111708
735 QueueFrame(credential_frame.get(), HIGHEST, NULL);
736 727
737 if (net_log().IsLoggingAllEvents()) { 728 if (net_log().IsLoggingAllEvents()) {
738 net_log().AddEvent( 729 net_log().AddEvent(
739 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, 730 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
740 make_scoped_refptr( 731 make_scoped_refptr(
741 new NetLogSpdyCredentialParameter(credential.slot, 732 new NetLogSpdyCredentialParameter(credential.slot,
742 origin))); 733 origin)));
743 } 734 }
744 return ERR_IO_PENDING; 735 return credential_frame.release();
745 } 736 }
746 737
747 int SpdySession::WriteStreamData(SpdyStreamId stream_id, 738 SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
748 net::IOBuffer* data, int len, 739 net::IOBuffer* data, int len,
749 SpdyDataFlags flags) { 740 SpdyDataFlags flags) {
750 // Find our stream 741 // Find our stream
751 CHECK(IsStreamActive(stream_id)); 742 CHECK(IsStreamActive(stream_id));
752 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 743 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
753 CHECK_EQ(stream->stream_id(), stream_id); 744 CHECK_EQ(stream->stream_id(), stream_id);
754 745
755 if (len > kMaxSpdyFrameChunkSize) { 746 if (len > kMaxSpdyFrameChunkSize) {
756 len = kMaxSpdyFrameChunkSize; 747 len = kMaxSpdyFrameChunkSize;
757 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 748 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
758 } 749 }
759 750
760 // Obey send window size of the stream if flow control is enabled. 751 // Obey send window size of the stream if flow control is enabled.
761 if (flow_control_) { 752 if (flow_control_) {
762 if (stream->send_window_size() <= 0) { 753 if (stream->send_window_size() <= 0) {
763 // Because we queue frames onto the session, it is possible that 754 // Because we queue frames onto the session, it is possible that
764 // a stream was not flow controlled at the time it attempted the 755 // a stream was not flow controlled at the time it attempted the
765 // write, but when we go to fulfill the write, it is now flow 756 // write, but when we go to fulfill the write, it is now flow
766 // controlled. This is why we need the session to mark the stream 757 // controlled. This is why we need the session to mark the stream
767 // as stalled - because only the session knows for sure when the 758 // as stalled - because only the session knows for sure when the
768 // stall occurs. 759 // stall occurs.
769 stream->set_stalled_by_flow_control(true); 760 stream->set_stalled_by_flow_control(true);
770 net_log().AddEvent( 761 net_log().AddEvent(
771 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, 762 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
772 make_scoped_refptr( 763 make_scoped_refptr(
773 new NetLogIntegerParameter("stream_id", stream_id))); 764 new NetLogIntegerParameter("stream_id", stream_id)));
774 return ERR_IO_PENDING; 765 return NULL;
775 } 766 }
776 int new_len = std::min(len, stream->send_window_size()); 767 int new_len = std::min(len, stream->send_window_size());
777 if (new_len < len) { 768 if (new_len < len) {
778 len = new_len; 769 len = new_len;
779 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 770 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
780 } 771 }
781 stream->DecreaseSendWindowSize(len); 772 stream->DecreaseSendWindowSize(len);
782 } 773 }
783 774
784 if (net_log().IsLoggingAllEvents()) { 775 if (net_log().IsLoggingAllEvents()) {
785 net_log().AddEvent( 776 net_log().AddEvent(
786 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 777 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
787 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags))); 778 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags)));
788 } 779 }
789 780
790 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 781 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
791 if (len > 0) 782 if (len > 0)
792 SendPrefacePingIfNoneInFlight(); 783 SendPrefacePingIfNoneInFlight();
793 784
794 // TODO(mbelshe): reduce memory copies here. 785 // TODO(mbelshe): reduce memory copies here.
795 DCHECK(buffered_spdy_framer_.get()); 786 DCHECK(buffered_spdy_framer_.get());
796 scoped_ptr<SpdyDataFrame> frame( 787 scoped_ptr<SpdyDataFrame> frame(
797 buffered_spdy_framer_->CreateDataFrame( 788 buffered_spdy_framer_->CreateDataFrame(
798 stream_id, data->data(), len, flags)); 789 stream_id, data->data(), len, flags));
799 QueueFrame(frame.get(), stream->priority(), stream);
800 790
801 return ERR_IO_PENDING; 791 return frame.release();
802 } 792 }
803 793
804 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { 794 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
795 DCHECK_NE(0u, stream_id);
805 // TODO(mbelshe): We should send a RST_STREAM control frame here 796 // TODO(mbelshe): We should send a RST_STREAM control frame here
806 // so that the server can cancel a large send. 797 // so that the server can cancel a large send.
807 798
808 DeleteStream(stream_id, status); 799 DeleteStream(stream_id, status);
809 } 800 }
810 801
802 void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
803 DCHECK_EQ(0u, stream->stream_id());
804 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
805 }
806
811 void SpdySession::ResetStream(SpdyStreamId stream_id, 807 void SpdySession::ResetStream(SpdyStreamId stream_id,
812 SpdyStatusCodes status, 808 SpdyStatusCodes status,
813 const std::string& description) { 809 const std::string& description) {
814 net_log().AddEvent( 810 net_log().AddEvent(
815 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 811 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
816 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status, 812 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status,
817 description))); 813 description)));
818 814
819 DCHECK(buffered_spdy_framer_.get()); 815 DCHECK(buffered_spdy_framer_.get());
820 scoped_ptr<SpdyRstStreamControlFrame> rst_frame( 816 scoped_ptr<SpdyRstStreamControlFrame> rst_frame(
821 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 817 buffered_spdy_framer_->CreateRstStream(stream_id, status));
822 818
823 // Default to lowest priority unless we know otherwise. 819 // Default to lowest priority unless we know otherwise.
824 RequestPriority priority = net::IDLE; 820 RequestPriority priority = net::IDLE;
825 if(IsStreamActive(stream_id)) { 821 if(IsStreamActive(stream_id)) {
826 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 822 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
827 priority = stream->priority(); 823 priority = stream->priority();
828 } 824 }
829 QueueFrame(rst_frame.get(), priority, NULL); 825 QueueFrame(rst_frame.release(), priority);
830 RecordProtocolErrorHistogram( 826 RecordProtocolErrorHistogram(
831 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 827 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
832 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 828 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
833 } 829 }
834 830
835 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 831 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
836 return ContainsKey(active_streams_, stream_id); 832 return ContainsKey(active_streams_, stream_id);
837 } 833 }
838 834
839 LoadState SpdySession::GetLoadState() const { 835 LoadState SpdySession::GetLoadState() const {
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
1007 // closed, just return. 1003 // closed, just return.
1008 if (state_ < CONNECTED || state_ == CLOSED) 1004 if (state_ < CONNECTED || state_ == CLOSED)
1009 return; 1005 return;
1010 1006
1011 if (write_pending_) // Another write is in progress still. 1007 if (write_pending_) // Another write is in progress still.
1012 return; 1008 return;
1013 1009
1014 // Loop sending frames until we've sent everything or until the write 1010 // Loop sending frames until we've sent everything or until the write
1015 // returns error (or ERR_IO_PENDING). 1011 // returns error (or ERR_IO_PENDING).
1016 DCHECK(buffered_spdy_framer_.get()); 1012 DCHECK(buffered_spdy_framer_.get());
1017 while (in_flight_write_.buffer() || !queue_.empty()) { 1013 while (in_flight_write_.buffer() || !write_queue_.empty()) {
1018 if (!in_flight_write_.buffer()) { 1014 if (!in_flight_write_.buffer()) {
1019 // Grab the next SpdyFrame to send. 1015 // Grab the next SpdyFrame to send.
1020 SpdyIOBuffer next_buffer = queue_.top(); 1016 SpdyFrameProducer* producer = write_queue_.top();
1021 queue_.pop(); 1017 write_queue_.pop();
1018 SpdyStream* stream = producer->GetSpdyStream();
1019 if (stream != NULL && stream->stream_id() == 0) {
1020 stream->set_stream_id(GetNewStreamId());
1021 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1022 ActivateStream(stream);
1023 DCHECK_EQ(active_streams_[stream->stream_id()].get(), stream);
1024 }
1025 SpdyFrame* frame = producer->ProduceNextFrame();
1026 if (write_queue_.size() > 0)
1027 // It is possible that a stream had data to write, but a
1028 // WINDOW_UPDATE frame has been received which made that
1029 // stream no longer writable.
1030 // TODO(rch): consider handling that case by removing the
1031 // stream from the writable queue?
1032 if (frame == NULL)
1033 continue;
1034 size_t size = frame->length() + SpdyFrame::kHeaderSize;
1035 DCHECK_GT(size, 0u);
1022 1036
1023 // We've deferred compression until just before we write it to the socket, 1037 // TODO(mbelshe): We have too much copying of data here.
1024 // which is now. At this time, we don't compress our data frames. 1038 IOBufferWithSize* buffer = new IOBufferWithSize(size);
1025 SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); 1039 memcpy(buffer->data(), frame->data(), size);
1026 size_t size;
1027 if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
1028 DCHECK(uncompressed_frame.is_control_frame());
1029 scoped_ptr<SpdyFrame> compressed_frame(
1030 buffered_spdy_framer_->CompressControlFrame(
1031 reinterpret_cast<const SpdyControlFrame&>(uncompressed_frame)));
1032 if (!compressed_frame.get()) {
1033 RecordProtocolErrorHistogram(
1034 PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
1035 CloseSessionOnError(
1036 net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
1037 return;
1038 }
1039 1040
1040 size = compressed_frame->length() + SpdyFrame::kHeaderSize; 1041 in_flight_write_ = SpdyIOBuffer(buffer, size, producer->GetPriority(),
1041 1042 stream);
1042 DCHECK_GT(size, 0u);
1043
1044 // TODO(mbelshe): We have too much copying of data here.
1045 IOBufferWithSize* buffer = new IOBufferWithSize(size);
1046 memcpy(buffer->data(), compressed_frame->data(), size);
1047
1048 // Attempt to send the frame.
1049 in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
1050 next_buffer.stream());
1051 } else {
1052 size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
1053 in_flight_write_ = next_buffer;
1054 }
1055 } else { 1043 } else {
1056 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 1044 DCHECK(in_flight_write_.buffer()->BytesRemaining());
1057 } 1045 }
1058 1046
1059 write_pending_ = true; 1047 write_pending_ = true;
1060 int rv = connection_->socket()->Write( 1048 int rv = connection_->socket()->Write(
1061 in_flight_write_.buffer(), 1049 in_flight_write_.buffer(),
1062 in_flight_write_.buffer()->BytesRemaining(), 1050 in_flight_write_.buffer()->BytesRemaining(),
1063 base::Bind(&SpdySession::OnWriteComplete, base::Unretained(this))); 1051 base::Bind(&SpdySession::OnWriteComplete, base::Unretained(this)));
1064 if (rv == net::ERR_IO_PENDING) 1052 if (rv == net::ERR_IO_PENDING)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
1098 while (!active_streams_.empty()) { 1086 while (!active_streams_.empty()) {
1099 ActiveStreamMap::iterator it = active_streams_.begin(); 1087 ActiveStreamMap::iterator it = active_streams_.begin();
1100 const scoped_refptr<SpdyStream>& stream = it->second; 1088 const scoped_refptr<SpdyStream>& stream = it->second;
1101 DCHECK(stream); 1089 DCHECK(stream);
1102 std::string description = base::StringPrintf( 1090 std::string description = base::StringPrintf(
1103 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); 1091 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
1104 stream->LogStreamError(status, description); 1092 stream->LogStreamError(status, description);
1105 DeleteStream(stream->stream_id(), status); 1093 DeleteStream(stream->stream_id(), status);
1106 } 1094 }
1107 1095
1096 while (!created_streams_.empty()) {
1097 CreatedStreamSet::iterator it = created_streams_.begin();
1098 const scoped_refptr<SpdyStream>& stream = *it;
1099 std::string description = base::StringPrintf(
1100 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
1101 stream->LogStreamError(status, description);
1102 if (stream)
1103 stream->OnClose(status);
1104 created_streams_.erase(it);
1105 }
1106
1108 // We also need to drain the queue. 1107 // We also need to drain the queue.
1109 while (queue_.size()) 1108 while (!write_queue_.empty())
1110 queue_.pop(); 1109 write_queue_.pop();
1111 } 1110 }
1112 1111
1113 int SpdySession::GetNewStreamId() { 1112 int SpdySession::GetNewStreamId() {
1114 int id = stream_hi_water_mark_; 1113 int id = stream_hi_water_mark_;
1115 stream_hi_water_mark_ += 2; 1114 stream_hi_water_mark_ += 2;
1116 if (stream_hi_water_mark_ > 0x7fff) 1115 if (stream_hi_water_mark_ > 0x7fff)
1117 stream_hi_water_mark_ = 1; 1116 stream_hi_water_mark_ = 1;
1118 return id; 1117 return id;
1119 } 1118 }
1120 1119
1121 void SpdySession::QueueFrame(SpdyFrame* frame,
1122 RequestPriority priority,
1123 SpdyStream* stream) {
1124 int length = SpdyFrame::kHeaderSize + frame->length();
1125 IOBuffer* buffer = new IOBuffer(length);
1126 memcpy(buffer->data(), frame->data(), length);
1127 queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
1128
1129 WriteSocketLater();
1130 }
1131
1132 void SpdySession::CloseSessionOnError(net::Error err, 1120 void SpdySession::CloseSessionOnError(net::Error err,
1133 bool remove_from_pool, 1121 bool remove_from_pool,
1134 const std::string& description) { 1122 const std::string& description) {
1135 // Closing all streams can have a side-effect of dropping the last reference 1123 // Closing all streams can have a side-effect of dropping the last reference
1136 // to |this|. Hold a reference through this function. 1124 // to |this|. Hold a reference through this function.
1137 scoped_refptr<SpdySession> self(this); 1125 scoped_refptr<SpdySession> self(this);
1138 1126
1139 DCHECK_LT(err, OK); 1127 DCHECK_LT(err, OK);
1140 net_log_.AddEvent( 1128 net_log_.AddEvent(
1141 NetLog::TYPE_SPDY_SESSION_CLOSE, 1129 NetLog::TYPE_SPDY_SESSION_CLOSE,
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
1212 return connection_->socket()->GetPeerAddress(address); 1200 return connection_->socket()->GetPeerAddress(address);
1213 } 1201 }
1214 1202
1215 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1203 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1216 if (!connection_->socket()) 1204 if (!connection_->socket())
1217 return ERR_SOCKET_NOT_CONNECTED; 1205 return ERR_SOCKET_NOT_CONNECTED;
1218 1206
1219 return connection_->socket()->GetLocalAddress(address); 1207 return connection_->socket()->GetLocalAddress(address);
1220 } 1208 }
1221 1209
1210 class SimpleSpdyFrameProducer : public SpdyFrameProducer {
1211 public:
1212 SimpleSpdyFrameProducer(SpdyFrame* frame,
1213 RequestPriority priority)
1214 : frame_(frame),
1215 priority_(priority) {
1216 }
1217 virtual RequestPriority GetPriority() const OVERRIDE {
1218 return priority_;
1219 }
1220 virtual SpdyFrame* ProduceNextFrame() OVERRIDE {
1221 return frame_;
1222 }
1223 virtual SpdyStream* GetSpdyStream() const OVERRIDE {
1224 return NULL;
1225 }
1226 private:
1227 SpdyFrame* frame_;
1228 RequestPriority priority_;
1229 };
1230
1231 void SpdySession::QueueFrame(SpdyFrame* frame,
1232 RequestPriority priority) {
1233 SimpleSpdyFrameProducer* producer
1234 = new SimpleSpdyFrameProducer(frame, priority);
1235 write_queue_.push(producer);
1236 WriteSocketLater();
1237 }
1238
1222 void SpdySession::ActivateStream(SpdyStream* stream) { 1239 void SpdySession::ActivateStream(SpdyStream* stream) {
1223 const SpdyStreamId id = stream->stream_id(); 1240 const SpdyStreamId id = stream->stream_id();
1224 DCHECK(!IsStreamActive(id)); 1241 DCHECK(!IsStreamActive(id));
1225 1242
1226 active_streams_[id] = stream; 1243 active_streams_[id] = stream;
1227 } 1244 }
1228 1245
1229 void SpdySession::DeleteStream(SpdyStreamId id, int status) { 1246 void SpdySession::DeleteStream(SpdyStreamId id, int status) {
1230 // For push streams, if they are being deleted normally, we leave 1247 // For push streams, if they are being deleted normally, we leave
1231 // the stream in the unclaimed_pushed_streams_ list. However, if 1248 // the stream in the unclaimed_pushed_streams_ list. However, if
1232 // the stream is errored out, clean it up entirely. 1249 // the stream is errored out, clean it up entirely.
1233 if (status != OK) { 1250 if (status != OK) {
1234 PushedStreamMap::iterator it; 1251 PushedStreamMap::iterator it;
1235 for (it = unclaimed_pushed_streams_.begin(); 1252 for (it = unclaimed_pushed_streams_.begin();
1236 it != unclaimed_pushed_streams_.end(); ++it) { 1253 it != unclaimed_pushed_streams_.end(); ++it) {
1237 scoped_refptr<SpdyStream> curr = it->second; 1254 scoped_refptr<SpdyStream> curr = it->second;
1238 if (id == curr->stream_id()) { 1255 if (id == curr->stream_id()) {
1239 unclaimed_pushed_streams_.erase(it); 1256 unclaimed_pushed_streams_.erase(it);
1240 break; 1257 break;
1241 } 1258 }
1242 } 1259 }
1243 } 1260 }
1244 1261
1245 // The stream might have been deleted. 1262 // The stream might have been deleted.
1246 ActiveStreamMap::iterator it2 = active_streams_.find(id); 1263 ActiveStreamMap::iterator it2 = active_streams_.find(id);
1247 if (it2 == active_streams_.end()) 1264 if (it2 == active_streams_.end())
1248 return; 1265 return;
1249 1266
1267 // Possibly remove from the write queue
1268 WriteQueue old = write_queue_;
1269 write_queue_ = WriteQueue();
1270 while (!old.empty()) {
1271 SpdyFrameProducer* producer = old.top();
1272 if (producer->GetSpdyStream() == NULL ||
1273 producer->GetSpdyStream()->stream_id() != id)
1274 write_queue_.push(producer);
1275 old.pop();
1276 }
1277
1250 // If this is an active stream, call the callback. 1278 // If this is an active stream, call the callback.
1251 const scoped_refptr<SpdyStream> stream(it2->second); 1279 const scoped_refptr<SpdyStream> stream(it2->second);
1252 active_streams_.erase(it2); 1280 active_streams_.erase(it2);
1253 if (stream) 1281 if (stream)
1254 stream->OnClose(status); 1282 stream->OnClose(status);
1255 ProcessPendingCreateStreams(); 1283 ProcessPendingCreateStreams();
1256 } 1284 }
1257 1285
1258 void SpdySession::RemoveFromPool() { 1286 void SpdySession::RemoveFromPool() {
1259 if (spdy_session_pool_) { 1287 if (spdy_session_pool_) {
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after
1456 } 1484 }
1457 1485
1458 // There should not be an existing pushed stream with the same path. 1486 // There should not be an existing pushed stream with the same path.
1459 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url); 1487 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url);
1460 if (it != unclaimed_pushed_streams_.end()) { 1488 if (it != unclaimed_pushed_streams_.end()) {
1461 ResetStream(stream_id, PROTOCOL_ERROR, 1489 ResetStream(stream_id, PROTOCOL_ERROR,
1462 "Received duplicate pushed stream with url: " + url); 1490 "Received duplicate pushed stream with url: " + url);
1463 return; 1491 return;
1464 } 1492 }
1465 1493
1466 scoped_refptr<SpdyStream> stream( 1494 scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
1467 new SpdyStream(this, stream_id, true, net_log_)); 1495 stream->set_stream_id(stream_id);
1468 1496
1469 stream->set_path(gurl.PathForRequest()); 1497 stream->set_path(gurl.PathForRequest());
1470 stream->set_send_window_size(initial_send_window_size_); 1498 stream->set_send_window_size(initial_send_window_size_);
1471 stream->set_recv_window_size(initial_recv_window_size_); 1499 stream->set_recv_window_size(initial_recv_window_size_);
1472 1500
1473 unclaimed_pushed_streams_[url] = stream; 1501 unclaimed_pushed_streams_[url] = stream;
1474 1502
1475 ActivateStream(stream); 1503 ActivateStream(stream);
1476 stream->set_response_received(); 1504 stream->set_response_received();
1477 1505
1478 // Parse the headers. 1506 // Parse the headers.
1479 if (!Respond(*headers, stream)) 1507 if (!Respond(*headers, stream))
1480 return; 1508 return;
1481 1509
1482 base::StatsCounter push_requests("spdy.pushed_streams"); 1510 base::StatsCounter push_requests("spdy.pushed_streams");
1483 push_requests.Increment(); 1511 push_requests.Increment();
1484 } 1512 }
1485 1513
1486 void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame, 1514 void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame,
1487 const linked_ptr<SpdyHeaderBlock>& headers) { 1515 const linked_ptr<SpdyHeaderBlock>& headers) {
1488 SpdyStreamId stream_id = frame.stream_id(); 1516 SpdyStreamId stream_id = frame.stream_id();
1489
1490 if (net_log().IsLoggingAllEvents()) { 1517 if (net_log().IsLoggingAllEvents()) {
1491 net_log().AddEvent( 1518 net_log().AddEvent(
1492 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 1519 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
1493 make_scoped_refptr(new NetLogSpdySynParameter( 1520 make_scoped_refptr(new NetLogSpdySynParameter(
1494 headers, static_cast<SpdyControlFlags>(frame.flags()), 1521 headers, static_cast<SpdyControlFlags>(frame.flags()),
1495 stream_id, 0))); 1522 stream_id, 0)));
1496 } 1523 }
1497 1524
1498 if (!IsStreamActive(stream_id)) { 1525 if (!IsStreamActive(stream_id)) {
1499 // NOTE: it may just be that the stream was cancelled. 1526 // NOTE: it may just be that the stream was cancelled.
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
1664 CHECK_EQ(stream->stream_id(), stream_id); 1691 CHECK_EQ(stream->stream_id(), stream_id);
1665 1692
1666 net_log_.AddEvent( 1693 net_log_.AddEvent(
1667 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE, 1694 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE,
1668 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter( 1695 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter(
1669 stream_id, delta_window_size))); 1696 stream_id, delta_window_size)));
1670 1697
1671 DCHECK(buffered_spdy_framer_.get()); 1698 DCHECK(buffered_spdy_framer_.get());
1672 scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( 1699 scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
1673 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 1700 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
1674 QueueFrame(window_update_frame.get(), stream->priority(), NULL); 1701 QueueFrame(window_update_frame.release(), stream->priority());
1675 } 1702 }
1676 1703
1677 // Given a cwnd that we would have sent to the server, modify it based on the 1704 // Given a cwnd that we would have sent to the server, modify it based on the
1678 // field trial policy. 1705 // field trial policy.
1679 uint32 ApplyCwndFieldTrialPolicy(int cwnd) { 1706 uint32 ApplyCwndFieldTrialPolicy(int cwnd) {
1680 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd"); 1707 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd");
1681 if (!trial) { 1708 if (!trial) {
1682 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList"; 1709 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList";
1683 return cwnd; 1710 return cwnd;
1684 } 1711 }
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
1726 1753
1727 net_log_.AddEvent( 1754 net_log_.AddEvent(
1728 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 1755 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1729 make_scoped_refptr(new NetLogSpdySettingsParameter(settings_map_new))); 1756 make_scoped_refptr(new NetLogSpdySettingsParameter(settings_map_new)));
1730 1757
1731 // Create the SETTINGS frame and send it. 1758 // Create the SETTINGS frame and send it.
1732 DCHECK(buffered_spdy_framer_.get()); 1759 DCHECK(buffered_spdy_framer_.get());
1733 scoped_ptr<SpdySettingsControlFrame> settings_frame( 1760 scoped_ptr<SpdySettingsControlFrame> settings_frame(
1734 buffered_spdy_framer_->CreateSettings(settings_map_new)); 1761 buffered_spdy_framer_->CreateSettings(settings_map_new));
1735 sent_settings_ = true; 1762 sent_settings_ = true;
1736 QueueFrame(settings_frame.get(), HIGHEST, NULL); 1763 QueueFrame(settings_frame.release(), HIGHEST);
1737 } 1764 }
1738 1765
1739 void SpdySession::HandleSetting(uint32 id, uint32 value) { 1766 void SpdySession::HandleSetting(uint32 id, uint32 value) {
1740 switch (id) { 1767 switch (id) {
1741 case SETTINGS_MAX_CONCURRENT_STREAMS: 1768 case SETTINGS_MAX_CONCURRENT_STREAMS:
1742 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 1769 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
1743 g_max_concurrent_stream_limit); 1770 g_max_concurrent_stream_limit);
1744 ProcessPendingCreateStreams(); 1771 ProcessPendingCreateStreams();
1745 break; 1772 break;
1746 case SETTINGS_INITIAL_WINDOW_SIZE: 1773 case SETTINGS_INITIAL_WINDOW_SIZE:
(...skipping 16 matching lines...) Expand all
1763 } 1790 }
1764 } 1791 }
1765 1792
1766 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 1793 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
1767 ActiveStreamMap::iterator it; 1794 ActiveStreamMap::iterator it;
1768 for (it = active_streams_.begin(); it != active_streams_.end(); ++it) { 1795 for (it = active_streams_.begin(); it != active_streams_.end(); ++it) {
1769 const scoped_refptr<SpdyStream>& stream = it->second; 1796 const scoped_refptr<SpdyStream>& stream = it->second;
1770 DCHECK(stream); 1797 DCHECK(stream);
1771 stream->AdjustSendWindowSize(delta_window_size); 1798 stream->AdjustSendWindowSize(delta_window_size);
1772 } 1799 }
1800
1801 CreatedStreamSet::iterator i;
1802 for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
1803 const scoped_refptr<SpdyStream>& stream = *i;
1804 stream->AdjustSendWindowSize(delta_window_size);
1805 }
1773 } 1806 }
1774 1807
1775 void SpdySession::SendPrefacePingIfNoneInFlight() { 1808 void SpdySession::SendPrefacePingIfNoneInFlight() {
1776 if (pings_in_flight_ || !g_enable_ping_based_connection_checking) 1809 if (pings_in_flight_ || !g_enable_ping_based_connection_checking)
1777 return; 1810 return;
1778 1811
1779 base::TimeTicks now = base::TimeTicks::Now(); 1812 base::TimeTicks now = base::TimeTicks::Now();
1780 // If there is no activity in the session, then send a preface-PING. 1813 // If there is no activity in the session, then send a preface-PING.
1781 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 1814 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
1782 SendPrefacePing(); 1815 SendPrefacePing();
1783 } 1816 }
1784 1817
1785 void SpdySession::SendPrefacePing() { 1818 void SpdySession::SendPrefacePing() {
1786 WritePingFrame(next_ping_id_); 1819 WritePingFrame(next_ping_id_);
1787 } 1820 }
1788 1821
1789 void SpdySession::WritePingFrame(uint32 unique_id) { 1822 void SpdySession::WritePingFrame(uint32 unique_id) {
1790 DCHECK(buffered_spdy_framer_.get()); 1823 DCHECK(buffered_spdy_framer_.get());
1791 scoped_ptr<SpdyPingControlFrame> ping_frame( 1824 scoped_ptr<SpdyPingControlFrame> ping_frame(
1792 buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); 1825 buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
1793 QueueFrame(ping_frame.get(), HIGHEST, NULL); 1826 QueueFrame(ping_frame.release(), HIGHEST);
1794 1827
1795 if (net_log().IsLoggingAllEvents()) { 1828 if (net_log().IsLoggingAllEvents()) {
1796 net_log().AddEvent( 1829 net_log().AddEvent(
1797 NetLog::TYPE_SPDY_SESSION_PING, 1830 NetLog::TYPE_SPDY_SESSION_PING,
1798 make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_, "sent"))); 1831 make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_, "sent")));
1799 } 1832 }
1800 if (unique_id % 2 != 0) { 1833 if (unique_id % 2 != 0) {
1801 next_ping_id_ += 2; 1834 next_ping_id_ += 2;
1802 ++pings_in_flight_; 1835 ++pings_in_flight_;
1803 PlanToCheckPingStatus(); 1836 PlanToCheckPingStatus();
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
1946 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 1979 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
1947 if (!is_secure_) 1980 if (!is_secure_)
1948 return NULL; 1981 return NULL;
1949 SSLClientSocket* ssl_socket = 1982 SSLClientSocket* ssl_socket =
1950 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 1983 reinterpret_cast<SSLClientSocket*>(connection_->socket());
1951 DCHECK(ssl_socket); 1984 DCHECK(ssl_socket);
1952 return ssl_socket; 1985 return ssl_socket;
1953 } 1986 }
1954 1987
1955 } // namespace net 1988 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698