Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(317)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 13009012: [SPDY] Refactor SpdySession's write queue (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix gyp error Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after
264 264
265 void SpdyStreamRequest::Reset() { 265 void SpdyStreamRequest::Reset() {
266 session_ = NULL; 266 session_ = NULL;
267 stream_ = NULL; 267 stream_ = NULL;
268 url_ = GURL(); 268 url_ = GURL();
269 priority_ = MINIMUM_PRIORITY; 269 priority_ = MINIMUM_PRIORITY;
270 net_log_ = BoundNetLog(); 270 net_log_ = BoundNetLog();
271 callback_.Reset(); 271 callback_.Reset();
272 } 272 }
273 273
274 // static
275 void SpdySession::SpdyIOBufferProducer::ActivateStream(
276 SpdySession* spdy_session,
277 SpdyStream* spdy_stream) {
278 spdy_session->ActivateStream(spdy_stream);
279 }
280
281 // static
282 SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
283 SpdyFrame* frame,
284 RequestPriority priority,
285 SpdyStream* stream) {
286 size_t size = frame->size();
287 DCHECK_GT(size, 0u);
288
289 // TODO(mbelshe): We have too much copying of data here.
290 IOBufferWithSize* buffer = new IOBufferWithSize(size);
291 memcpy(buffer->data(), frame->data(), size);
292
293 return new SpdyIOBuffer(buffer, size, priority, stream);
294 }
295
296 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, 274 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
297 SpdySessionPool* spdy_session_pool, 275 SpdySessionPool* spdy_session_pool,
298 HttpServerProperties* http_server_properties, 276 HttpServerProperties* http_server_properties,
299 bool verify_domain_authentication, 277 bool verify_domain_authentication,
300 bool enable_sending_initial_settings, 278 bool enable_sending_initial_settings,
301 bool enable_credential_frames, 279 bool enable_credential_frames,
302 bool enable_compression, 280 bool enable_compression,
303 bool enable_ping_based_connection_checking, 281 bool enable_ping_based_connection_checking,
304 NextProto default_protocol, 282 NextProto default_protocol,
305 size_t stream_initial_recv_window_size, 283 size_t stream_initial_recv_window_size,
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
382 360
383 if (connection_->is_initialized()) { 361 if (connection_->is_initialized()) {
384 // With SPDY we can't recycle sockets. 362 // With SPDY we can't recycle sockets.
385 connection_->socket()->Disconnect(); 363 connection_->socket()->Disconnect();
386 } 364 }
387 365
388 // Streams should all be gone now. 366 // Streams should all be gone now.
389 DCHECK_EQ(0u, num_active_streams()); 367 DCHECK_EQ(0u, num_active_streams());
390 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); 368 DCHECK_EQ(0u, num_unclaimed_pushed_streams());
391 369
392 for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { 370 for (int i = 0; i < NUM_PRIORITIES; ++i) {
393 DCHECK(pending_create_stream_queues_[i].empty()); 371 DCHECK(pending_create_stream_queues_[i].empty());
394 } 372 }
395 DCHECK(pending_stream_request_completions_.empty()); 373 DCHECK(pending_stream_request_completions_.empty());
396 374
397 RecordHistograms(); 375 RecordHistograms();
398 376
399 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 377 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
400 } 378 }
401 379
402 net::Error SpdySession::InitializeWithSocket( 380 net::Error SpdySession::InitializeWithSocket(
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
478 return true; // This is not a secure session, so all domains are okay. 456 return true; // This is not a secure session, so all domains are okay.
479 457
480 return !ssl_info.client_cert_sent && 458 return !ssl_info.client_cert_sent &&
481 (enable_credential_frames_ || !ssl_info.channel_id_sent || 459 (enable_credential_frames_ || !ssl_info.channel_id_sent ||
482 ServerBoundCertService::GetDomainForHost(domain) == 460 ServerBoundCertService::GetDomainForHost(domain) ==
483 ServerBoundCertService::GetDomainForHost( 461 ServerBoundCertService::GetDomainForHost(
484 host_port_proxy_pair_.first.host())) && 462 host_port_proxy_pair_.first.host())) &&
485 ssl_info.cert->VerifyNameMatch(domain); 463 ssl_info.cert->VerifyNameMatch(domain);
486 } 464 }
487 465
488 void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
489 SpdyIOBufferProducer* producer) {
490 write_queue_.push(producer);
491 stream_producers_[producer] = stream;
492 WriteSocketLater();
493 }
494
495 int SpdySession::GetPushStream( 466 int SpdySession::GetPushStream(
496 const GURL& url, 467 const GURL& url,
497 scoped_refptr<SpdyStream>* stream, 468 scoped_refptr<SpdyStream>* stream,
498 const BoundNetLog& stream_net_log) { 469 const BoundNetLog& stream_net_log) {
499 CHECK_NE(state_, STATE_CLOSED); 470 CHECK_NE(state_, STATE_CLOSED);
500 471
501 *stream = NULL; 472 *stream = NULL;
502 473
503 // Don't allow access to secure push streams over an unauthenticated, but 474 // Don't allow access to secure push streams over an unauthenticated, but
504 // encrypted SSL socket. 475 // encrypted SSL socket.
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
638 609
639 void SpdySession::AddPooledAlias(const HostPortProxyPair& alias) { 610 void SpdySession::AddPooledAlias(const HostPortProxyPair& alias) {
640 pooled_aliases_.insert(alias); 611 pooled_aliases_.insert(alias);
641 } 612 }
642 613
643 int SpdySession::GetProtocolVersion() const { 614 int SpdySession::GetProtocolVersion() const {
644 DCHECK(buffered_spdy_framer_.get()); 615 DCHECK(buffered_spdy_framer_.get());
645 return buffered_spdy_framer_->protocol_version(); 616 return buffered_spdy_framer_->protocol_version();
646 } 617 }
647 618
648 SpdyFrame* SpdySession::CreateSynStream( 619 void SpdySession::EnqueueStreamWrite(
620 SpdyStream* stream,
621 scoped_ptr<SpdyFrameProducer> producer) {
622 EnqueueWrite(stream->priority(), producer.Pass(), stream);
623 }
624
625 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
649 SpdyStreamId stream_id, 626 SpdyStreamId stream_id,
650 RequestPriority priority, 627 RequestPriority priority,
651 uint8 credential_slot, 628 uint8 credential_slot,
652 SpdyControlFlags flags, 629 SpdyControlFlags flags,
653 const SpdyHeaderBlock& headers) { 630 const SpdyHeaderBlock& headers) {
654 CHECK(IsStreamActive(stream_id)); 631 CHECK(IsStreamActive(stream_id));
655 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; 632 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
656 CHECK_EQ(stream->stream_id(), stream_id); 633 CHECK_EQ(stream->stream_id(), stream_id);
657 634
658 SendPrefacePingIfNoneInFlight(); 635 SendPrefacePingIfNoneInFlight();
(...skipping 11 matching lines...) Expand all
670 647
671 if (net_log().IsLoggingAllEvents()) { 648 if (net_log().IsLoggingAllEvents()) {
672 net_log().AddEvent( 649 net_log().AddEvent(
673 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 650 NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
674 base::Bind(&NetLogSpdySynCallback, &headers, 651 base::Bind(&NetLogSpdySynCallback, &headers,
675 (flags & CONTROL_FLAG_FIN) != 0, 652 (flags & CONTROL_FLAG_FIN) != 0,
676 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 653 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
677 stream_id, 0)); 654 stream_id, 0));
678 } 655 }
679 656
680 return syn_frame.release(); 657 return syn_frame.Pass();
681 } 658 }
682 659
683 SpdyFrame* SpdySession::CreateCredentialFrame( 660 int SpdySession::CreateCredentialFrame(
684 const std::string& origin, 661 const std::string& origin,
685 SSLClientCertType type, 662 SSLClientCertType type,
686 const std::string& key, 663 const std::string& key,
687 const std::string& cert, 664 const std::string& cert,
688 RequestPriority priority) { 665 RequestPriority priority,
666 scoped_ptr<SpdyFrame>* credential_frame) {
689 DCHECK(is_secure_); 667 DCHECK(is_secure_);
690 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 668 SSLClientSocket* ssl_socket = GetSSLClientSocket();
691 DCHECK(ssl_socket); 669 DCHECK(ssl_socket);
692 DCHECK(ssl_socket->WasChannelIDSent()); 670 DCHECK(ssl_socket->WasChannelIDSent());
693 671
694 SpdyCredential credential; 672 SpdyCredential credential;
695 std::string tls_unique; 673 std::string tls_unique;
696 ssl_socket->GetTLSUniqueChannelBinding(&tls_unique); 674 ssl_socket->GetTLSUniqueChannelBinding(&tls_unique);
697 size_t slot = credential_state_.SetHasCredential(GURL(origin)); 675 size_t slot = credential_state_.SetHasCredential(GURL(origin));
698 int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot, 676 int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot,
699 &credential); 677 &credential);
700 DCHECK_EQ(OK, rv); 678 if (rv != OK) {
701 if (rv != OK) 679 DCHECK_NE(rv, ERR_IO_PENDING);
Ryan Hamilton 2013/03/28 15:51:04 if you want to save a line, you could write this a
akalin 2013/04/06 01:17:48 Done.
702 return NULL; 680 return rv;
681 }
703 682
704 DCHECK(buffered_spdy_framer_.get()); 683 DCHECK(buffered_spdy_framer_.get());
705 scoped_ptr<SpdyFrame> credential_frame( 684 credential_frame->reset(
706 buffered_spdy_framer_->CreateCredentialFrame(credential)); 685 buffered_spdy_framer_->CreateCredentialFrame(credential));
707 686
708 if (net_log().IsLoggingAllEvents()) { 687 if (net_log().IsLoggingAllEvents()) {
709 net_log().AddEvent( 688 net_log().AddEvent(
710 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, 689 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
711 base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); 690 base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
712 } 691 }
713 return credential_frame.release(); 692 return OK;
714 } 693 }
715 694
716 SpdyFrame* SpdySession::CreateHeadersFrame( 695 scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
717 SpdyStreamId stream_id, 696 SpdyStreamId stream_id,
718 const SpdyHeaderBlock& headers, 697 const SpdyHeaderBlock& headers,
719 SpdyControlFlags flags) { 698 SpdyControlFlags flags) {
720 // Find our stream 699 // Find our stream
721 CHECK(IsStreamActive(stream_id)); 700 CHECK(IsStreamActive(stream_id));
722 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 701 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
723 CHECK_EQ(stream->stream_id(), stream_id); 702 CHECK_EQ(stream->stream_id(), stream_id);
724 703
725 // Create a HEADER frame. 704 // Create a HEADER frame.
726 scoped_ptr<SpdyFrame> frame( 705 scoped_ptr<SpdyFrame> frame(
727 buffered_spdy_framer_->CreateHeaders( 706 buffered_spdy_framer_->CreateHeaders(
728 stream_id, flags, enable_compression_, &headers)); 707 stream_id, flags, enable_compression_, &headers));
729 708
730 if (net_log().IsLoggingAllEvents()) { 709 if (net_log().IsLoggingAllEvents()) {
731 bool fin = flags & CONTROL_FLAG_FIN; 710 bool fin = flags & CONTROL_FLAG_FIN;
732 net_log().AddEvent( 711 net_log().AddEvent(
733 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, 712 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS,
734 base::Bind(&NetLogSpdySynCallback, 713 base::Bind(&NetLogSpdySynCallback,
735 &headers, fin, /*unidirectional=*/false, 714 &headers, fin, /*unidirectional=*/false,
736 stream_id, 0)); 715 stream_id, 0));
737 } 716 }
738 return frame.release(); 717 return frame.Pass();
739 } 718 }
740 719
741 SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, 720 scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(
742 net::IOBuffer* data, int len, 721 SpdyStreamId stream_id,
Ryan Hamilton 2013/03/28 15:51:04 I think you can move this to the previous line, an
akalin 2013/04/06 01:17:48 Done.
743 SpdyDataFlags flags) { 722 net::IOBuffer* data,
723 int len,
724 SpdyDataFlags flags) {
744 // Find our stream 725 // Find our stream
745 CHECK(IsStreamActive(stream_id)); 726 CHECK(IsStreamActive(stream_id));
746 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 727 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
747 CHECK_EQ(stream->stream_id(), stream_id); 728 CHECK_EQ(stream->stream_id(), stream_id);
748 729
749 if (len < 0) { 730 if (len < 0) {
750 NOTREACHED(); 731 NOTREACHED();
751 return NULL; 732 return scoped_ptr<SpdyFrame>();
752 } 733 }
753 734
754 if (len > kMaxSpdyFrameChunkSize) { 735 if (len > kMaxSpdyFrameChunkSize) {
755 len = kMaxSpdyFrameChunkSize; 736 len = kMaxSpdyFrameChunkSize;
756 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 737 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
757 } 738 }
758 739
759 // Obey send window size of the stream (and session, if applicable) 740 // Obey send window size of the stream (and session, if applicable)
760 // if flow control is enabled. 741 // if flow control is enabled.
761 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 742 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
762 int32 effective_window_size = stream->send_window_size(); 743 int32 effective_window_size = stream->send_window_size();
763 if (effective_window_size <= 0) { 744 if (effective_window_size <= 0) {
764 // Because we queue frames onto the session, it is possible that 745 // Because we queue frames onto the session, it is possible that
765 // a stream was not flow controlled at the time it attempted the 746 // a stream was not flow controlled at the time it attempted the
766 // write, but when we go to fulfill the write, it is now flow 747 // write, but when we go to fulfill the write, it is now flow
767 // controlled. This is why we need the session to mark the stream 748 // controlled. This is why we need the session to mark the stream
768 // as stalled - because only the session knows for sure when the 749 // as stalled - because only the session knows for sure when the
769 // stall occurs. 750 // stall occurs.
770 stream->set_send_stalled_by_flow_control(true); 751 stream->set_send_stalled_by_flow_control(true);
771 net_log().AddEvent( 752 net_log().AddEvent(
772 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, 753 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
773 NetLog::IntegerCallback("stream_id", stream_id)); 754 NetLog::IntegerCallback("stream_id", stream_id));
774 return NULL; 755 return scoped_ptr<SpdyFrame>();
775 } 756 }
776 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 757 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
777 effective_window_size = 758 effective_window_size =
778 std::min(effective_window_size, session_send_window_size_); 759 std::min(effective_window_size, session_send_window_size_);
779 if (effective_window_size <= 0) { 760 if (effective_window_size <= 0) {
780 DCHECK(IsSendStalled()); 761 DCHECK(IsSendStalled());
781 stream->set_send_stalled_by_flow_control(true); 762 stream->set_send_stalled_by_flow_control(true);
782 QueueSendStalledStream(stream); 763 QueueSendStalledStream(stream);
783 net_log().AddEvent( 764 net_log().AddEvent(
784 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, 765 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
785 NetLog::IntegerCallback("stream_id", stream_id)); 766 NetLog::IntegerCallback("stream_id", stream_id));
786 return NULL; 767 return scoped_ptr<SpdyFrame>();
787 } 768 }
788 } 769 }
789 770
790 int new_len = std::min(len, effective_window_size); 771 int new_len = std::min(len, effective_window_size);
791 if (new_len < len) { 772 if (new_len < len) {
792 len = new_len; 773 len = new_len;
793 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 774 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
794 } 775 }
795 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) 776 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
796 DecreaseSendWindowSize(static_cast<int32>(len)); 777 DecreaseSendWindowSize(static_cast<int32>(len));
(...skipping 10 matching lines...) Expand all
807 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 788 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
808 if (len > 0) 789 if (len > 0)
809 SendPrefacePingIfNoneInFlight(); 790 SendPrefacePingIfNoneInFlight();
810 791
811 // TODO(mbelshe): reduce memory copies here. 792 // TODO(mbelshe): reduce memory copies here.
812 DCHECK(buffered_spdy_framer_.get()); 793 DCHECK(buffered_spdy_framer_.get());
813 scoped_ptr<SpdyFrame> frame( 794 scoped_ptr<SpdyFrame> frame(
814 buffered_spdy_framer_->CreateDataFrame( 795 buffered_spdy_framer_->CreateDataFrame(
815 stream_id, data->data(), static_cast<uint32>(len), flags)); 796 stream_id, data->data(), static_cast<uint32>(len), flags));
816 797
817 return frame.release(); 798 return frame.Pass();
818 } 799 }
819 800
820 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { 801 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
821 DCHECK_NE(0u, stream_id); 802 DCHECK_NE(0u, stream_id);
822 // TODO(mbelshe): We should send a RST_STREAM control frame here 803 // TODO(mbelshe): We should send a RST_STREAM control frame here
823 // so that the server can cancel a large send. 804 // so that the server can cancel a large send.
824 805
825 DeleteStream(stream_id, status); 806 DeleteStream(stream_id, status);
826 } 807 }
827 808
(...skipping 14 matching lines...) Expand all
842 DCHECK(buffered_spdy_framer_.get()); 823 DCHECK(buffered_spdy_framer_.get());
843 scoped_ptr<SpdyFrame> rst_frame( 824 scoped_ptr<SpdyFrame> rst_frame(
844 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 825 buffered_spdy_framer_->CreateRstStream(stream_id, status));
845 826
846 // Default to lowest priority unless we know otherwise. 827 // Default to lowest priority unless we know otherwise.
847 RequestPriority priority = net::IDLE; 828 RequestPriority priority = net::IDLE;
848 if (IsStreamActive(stream_id)) { 829 if (IsStreamActive(stream_id)) {
849 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 830 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
850 priority = stream->priority(); 831 priority = stream->priority();
851 } 832 }
852 QueueFrame(rst_frame.release(), priority); 833 EnqueueSessionWrite(priority, rst_frame.Pass());
853 RecordProtocolErrorHistogram( 834 RecordProtocolErrorHistogram(
854 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 835 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
855 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 836 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
856 } 837 }
857 838
858 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 839 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
859 return ContainsKey(active_streams_, stream_id); 840 return ContainsKey(active_streams_, stream_id);
860 } 841 }
861 842
862 LoadState SpdySession::GetLoadState() const { 843 LoadState SpdySession::GetLoadState() const {
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
963 data += bytes_processed; 944 data += bytes_processed;
964 } 945 }
965 946
966 if (IsConnected()) 947 if (IsConnected())
967 state_ = STATE_DO_READ; 948 state_ = STATE_DO_READ;
968 return OK; 949 return OK;
969 } 950 }
970 951
971 void SpdySession::OnWriteComplete(int result) { 952 void SpdySession::OnWriteComplete(int result) {
972 DCHECK(write_pending_); 953 DCHECK(write_pending_);
973 DCHECK(in_flight_write_.size()); 954 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
974 955
975 last_activity_time_ = base::TimeTicks::Now(); 956 last_activity_time_ = base::TimeTicks::Now();
976 write_pending_ = false; 957 write_pending_ = false;
977 958
978 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 959 if (result < 0) {
960 in_flight_write_.Release();
961 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
962 return;
963 }
979 964
980 if (result >= 0) { 965 // It should not be possible to have written more bytes than our
981 // It should not be possible to have written more bytes than our 966 // in_flight_write_.
982 // in_flight_write_. 967 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
983 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
984 968
985 in_flight_write_.buffer()->DidConsume(result); 969 in_flight_write_.buffer()->DidConsume(result);
986 970
987 // We only notify the stream when we've fully written the pending frame. 971 // We only notify the stream when we've fully written the pending frame.
988 if (!in_flight_write_.buffer()->BytesRemaining()) { 972 if (in_flight_write_.buffer()->BytesRemaining() == 0) {
989 if (stream) { 973 DCHECK_GT(result, 0);
990 // Report the number of bytes written to the caller, but exclude the
991 // frame size overhead. NOTE: if this frame was compressed the
992 // reported bytes written is the compressed size, not the original
993 // size.
994 if (result > 0) {
995 result = in_flight_write_.buffer()->size();
996 DCHECK_GE(result,
997 static_cast<int>(
998 buffered_spdy_framer_->GetControlFrameHeaderSize()));
999 result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
1000 }
1001 974
1002 // It is possible that the stream was cancelled while we were writing 975 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
1003 // to the socket.
1004 if (!stream->cancelled())
1005 stream->OnWriteComplete(result);
1006 }
1007 976
1008 // Cleanup the write which just completed. 977 // It is possible that the stream was cancelled while we were writing
1009 in_flight_write_.release(); 978 // to the socket.
979 if (stream && !stream->cancelled()) {
980 // Report the number of bytes written to the caller, but exclude the
981 // frame size overhead. NOTE: if this frame was compressed the
982 // reported bytes written is the compressed size, not the original
983 // size.
Ryan Hamilton 2013/03/28 15:51:04 I wonder if the caller actually cares about the nu
akalin 2013/04/06 01:17:48 Yeah, the callers do care about the number of byte
984 result = in_flight_write_.buffer()->size();
985 DCHECK_GE(result,
986 static_cast<int>(
987 buffered_spdy_framer_->GetControlFrameHeaderSize()));
988 result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
989
990 stream->OnWriteComplete(result);
1010 } 991 }
1011 992
1012 // Write more data. We're already in a continuation, so we can 993 // Cleanup the write which just completed.
1013 // go ahead and write it immediately (without going back to the 994 in_flight_write_.Release();
1014 // message loop). 995 }
1015 WriteSocketLater();
1016 } else {
1017 in_flight_write_.release();
1018 996
1019 // The stream is now errored. Close it down. 997 // Write more data. We're already in a continuation, so we can go
1020 CloseSessionOnError( 998 // ahead and write it immediately (without going back to the message
1021 static_cast<net::Error>(result), true, "The stream has errored."); 999 // loop).
1022 } 1000 WriteSocketLater();
1023 } 1001 }
1024 1002
1025 void SpdySession::WriteSocketLater() { 1003 void SpdySession::WriteSocketLater() {
1026 if (delayed_write_pending_) 1004 if (delayed_write_pending_)
1027 return; 1005 return;
1028 1006
1029 if (!IsConnected()) 1007 if (!IsConnected())
1030 return; 1008 return;
1031 1009
1032 delayed_write_pending_ = true; 1010 delayed_write_pending_ = true;
(...skipping 12 matching lines...) Expand all
1045 // closed, just return. 1023 // closed, just return.
1046 if (!IsConnected()) 1024 if (!IsConnected())
1047 return; 1025 return;
1048 1026
1049 if (write_pending_) // Another write is in progress still. 1027 if (write_pending_) // Another write is in progress still.
1050 return; 1028 return;
1051 1029
1052 // Loop sending frames until we've sent everything or until the write 1030 // Loop sending frames until we've sent everything or until the write
1053 // returns error (or ERR_IO_PENDING). 1031 // returns error (or ERR_IO_PENDING).
1054 DCHECK(buffered_spdy_framer_.get()); 1032 DCHECK(buffered_spdy_framer_.get());
1055 while (in_flight_write_.buffer() || !write_queue_.empty()) { 1033 while (true) {
1056 if (!in_flight_write_.buffer()) { 1034 if (in_flight_write_.buffer()) {
1057 // Grab the next SpdyBuffer to send. 1035 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
1058 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); 1036 } else {
1059 write_queue_.pop(); 1037 // Grab the next frame to send.
1060 scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); 1038 scoped_ptr<SpdyFrameProducer> producer;
1061 stream_producers_.erase(producer.get()); 1039 scoped_refptr<SpdyStream> stream;
1040 if (!write_queue_.Dequeue(&producer, &stream))
1041 break;
1042
1062 // It is possible that a stream had data to write, but a 1043 // It is possible that a stream had data to write, but a
1063 // WINDOW_UPDATE frame has been received which made that 1044 // WINDOW_UPDATE frame has been received which made that
1064 // stream no longer writable. 1045 // stream no longer writable.
1065 // TODO(rch): consider handling that case by removing the 1046 // TODO(rch): consider handling that case by removing the
1066 // stream from the writable queue? 1047 // stream from the writable queue?
1067 if (buffer == NULL) 1048 if (stream.get() && stream->cancelled())
1068 continue; 1049 continue;
1069 1050
1070 in_flight_write_ = *buffer; 1051 if (stream.get() && stream->stream_id() == 0)
1071 } else { 1052 ActivateStream(stream);
1072 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 1053
1054 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
1055 if (!frame) {
1056 NOTREACHED();
1057 continue;
1058 }
1059 DCHECK_GT(frame->size(), 0u);
1060
1061 // TODO(mbelshe): We have too much copying of data here.
1062 scoped_refptr<IOBufferWithSize> buffer =
1063 new IOBufferWithSize(frame->size());
1064 memcpy(buffer->data(), frame->data(), frame->size());
1065 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
1073 } 1066 }
1074 1067
1075 write_pending_ = true; 1068 write_pending_ = true;
1076 int rv = connection_->socket()->Write( 1069 int rv = connection_->socket()->Write(
1077 in_flight_write_.buffer(), 1070 in_flight_write_.buffer(),
1078 in_flight_write_.buffer()->BytesRemaining(), 1071 in_flight_write_.buffer()->BytesRemaining(),
1079 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); 1072 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
1080 if (rv == net::ERR_IO_PENDING) 1073 if (rv == net::ERR_IO_PENDING)
1081 break; 1074 break;
1082 1075
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
1120 } 1113 }
1121 1114
1122 while (!created_streams_.empty()) { 1115 while (!created_streams_.empty()) {
1123 CreatedStreamSet::iterator it = created_streams_.begin(); 1116 CreatedStreamSet::iterator it = created_streams_.begin();
1124 const scoped_refptr<SpdyStream> stream = *it; 1117 const scoped_refptr<SpdyStream> stream = *it;
1125 created_streams_.erase(it); 1118 created_streams_.erase(it);
1126 LogAbandonedStream(stream, status); 1119 LogAbandonedStream(stream, status);
1127 stream->OnClose(status); 1120 stream->OnClose(status);
1128 } 1121 }
1129 1122
1130 // We also need to drain the queue. 1123 write_queue_.Clear();
1131 while (!write_queue_.empty()) {
1132 scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
1133 write_queue_.pop();
1134 stream_producers_.erase(producer.get());
1135 }
1136 } 1124 }
1137 1125
1138 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, 1126 void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
1139 net::Error status) { 1127 net::Error status) {
1140 DCHECK(stream); 1128 DCHECK(stream);
1141 std::string description = base::StringPrintf( 1129 std::string description = base::StringPrintf(
1142 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); 1130 "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
1143 stream->LogStreamError(status, description); 1131 stream->LogStreamError(status, description);
1144 } 1132 }
1145 1133
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
1248 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1236 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1249 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected", 1237 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionGetPeerAddressNotConnected",
1250 !connection_->socket()); 1238 !connection_->socket());
1251 if (!connection_->socket()) { 1239 if (!connection_->socket()) {
1252 return ERR_SOCKET_NOT_CONNECTED; 1240 return ERR_SOCKET_NOT_CONNECTED;
1253 } 1241 }
1254 1242
1255 return connection_->socket()->GetLocalAddress(address); 1243 return connection_->socket()->GetLocalAddress(address);
1256 } 1244 }
1257 1245
1258 class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { 1246 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1259 public: 1247 scoped_ptr<SpdyFrame> frame) {
1260 SimpleSpdyIOBufferProducer(SpdyFrame* frame, 1248 EnqueueWrite(
1261 RequestPriority priority) 1249 priority,
1262 : frame_(frame), 1250 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
1263 priority_(priority) { 1251 NULL);
1264 } 1252 }
1265 1253
1266 virtual RequestPriority GetPriority() const OVERRIDE { 1254 void SpdySession::EnqueueWrite(RequestPriority priority,
1267 return priority_; 1255 scoped_ptr<SpdyFrameProducer> producer,
1268 } 1256 const scoped_refptr<SpdyStream>& stream) {
1269 1257 write_queue_.Enqueue(priority, producer.Pass(), stream);
1270 virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
1271 return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
1272 frame_.get(), priority_, NULL);
1273 }
1274
1275 private:
1276 scoped_ptr<SpdyFrame> frame_;
1277 RequestPriority priority_;
1278 };
1279
1280 void SpdySession::QueueFrame(SpdyFrame* frame,
1281 RequestPriority priority) {
1282 SimpleSpdyIOBufferProducer* producer =
1283 new SimpleSpdyIOBufferProducer(frame, priority);
1284 write_queue_.push(producer);
1285 WriteSocketLater(); 1258 WriteSocketLater();
1286 } 1259 }
1287 1260
1288 void SpdySession::ActivateStream(SpdyStream* stream) { 1261 void SpdySession::ActivateStream(SpdyStream* stream) {
1289 if (stream->stream_id() == 0) { 1262 if (stream->stream_id() == 0) {
1290 stream->set_stream_id(GetNewStreamId()); 1263 stream->set_stream_id(GetNewStreamId());
1291 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); 1264 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1292 } 1265 }
1293 const SpdyStreamId id = stream->stream_id(); 1266 const SpdyStreamId id = stream->stream_id();
1294 DCHECK(!IsStreamActive(id)); 1267 DCHECK(!IsStreamActive(id));
1295 1268
1296 active_streams_[id] = stream; 1269 active_streams_[id] = stream;
1297 } 1270 }
1298 1271
1299 void SpdySession::DeleteStream(SpdyStreamId id, int status) { 1272 void SpdySession::DeleteStream(SpdyStreamId id, int status) {
1300 // For push streams, if they are being deleted normally, we leave 1273 // For push streams, if they are being deleted normally, we leave
1301 // the stream in the unclaimed_pushed_streams_ list. However, if 1274 // the stream in the unclaimed_pushed_streams_ list. However, if
1302 // the stream is errored out, clean it up entirely. 1275 // the stream is errored out, clean it up entirely.
1303 if (status != OK) { 1276 if (status != OK) {
1304 PushedStreamMap::iterator it; 1277 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
1305 for (it = unclaimed_pushed_streams_.begin();
1306 it != unclaimed_pushed_streams_.end(); ++it) { 1278 it != unclaimed_pushed_streams_.end(); ++it) {
1307 scoped_refptr<SpdyStream> curr = it->second.first; 1279 scoped_refptr<SpdyStream> curr = it->second.first;
1308 if (id == curr->stream_id()) { 1280 if (id == curr->stream_id()) {
1309 unclaimed_pushed_streams_.erase(it); 1281 unclaimed_pushed_streams_.erase(it);
1310 break; 1282 break;
1311 } 1283 }
1312 } 1284 }
1313 } 1285 }
1314 1286
1315 // The stream might have been deleted. 1287 // The stream might have been deleted.
1316 ActiveStreamMap::iterator it2 = active_streams_.find(id); 1288 ActiveStreamMap::iterator it = active_streams_.find(id);
1317 if (it2 == active_streams_.end()) 1289 if (it == active_streams_.end())
1318 return; 1290 return;
1319 1291
1320 // Possibly remove from the write queue. 1292 const scoped_refptr<SpdyStream> stream(it->second);
1321 WriteQueue old = write_queue_; 1293 active_streams_.erase(it);
1322 write_queue_ = WriteQueue(); 1294 DCHECK(stream);
1323 while (!old.empty()) { 1295
1324 scoped_ptr<SpdyIOBufferProducer> producer(old.top()); 1296 write_queue_.RemovePendingWritesForStream(stream);
1325 old.pop();
1326 StreamProducerMap::iterator it = stream_producers_.find(producer.get());
1327 if (it == stream_producers_.end() || it->second->stream_id() != id) {
1328 write_queue_.push(producer.release());
1329 } else {
1330 stream_producers_.erase(producer.get());
1331 producer.reset(NULL);
1332 }
1333 }
1334 1297
1335 // If this is an active stream, call the callback. 1298 // If this is an active stream, call the callback.
1336 const scoped_refptr<SpdyStream> stream(it2->second);
1337 active_streams_.erase(it2);
1338 DCHECK(stream);
1339 stream->OnClose(status); 1299 stream->OnClose(status);
1340 ProcessPendingStreamRequests(); 1300 ProcessPendingStreamRequests();
1341 } 1301 }
1342 1302
1343 void SpdySession::RemoveFromPool() { 1303 void SpdySession::RemoveFromPool() {
1344 if (spdy_session_pool_) { 1304 if (spdy_session_pool_) {
1345 SpdySessionPool* pool = spdy_session_pool_; 1305 SpdySessionPool* pool = spdy_session_pool_;
1346 spdy_session_pool_ = NULL; 1306 spdy_session_pool_ = NULL;
1347 pool->Remove(make_scoped_refptr(this)); 1307 pool->Remove(make_scoped_refptr(this));
1348 } 1308 }
(...skipping 563 matching lines...) Expand 10 before | Expand all | Expand 10 after
1912 void SpdySession::SendSettings(const SettingsMap& settings) { 1872 void SpdySession::SendSettings(const SettingsMap& settings) {
1913 net_log_.AddEvent( 1873 net_log_.AddEvent(
1914 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 1874 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
1915 base::Bind(&NetLogSpdySettingsCallback, &settings)); 1875 base::Bind(&NetLogSpdySettingsCallback, &settings));
1916 1876
1917 // Create the SETTINGS frame and send it. 1877 // Create the SETTINGS frame and send it.
1918 DCHECK(buffered_spdy_framer_.get()); 1878 DCHECK(buffered_spdy_framer_.get());
1919 scoped_ptr<SpdyFrame> settings_frame( 1879 scoped_ptr<SpdyFrame> settings_frame(
1920 buffered_spdy_framer_->CreateSettings(settings)); 1880 buffered_spdy_framer_->CreateSettings(settings));
1921 sent_settings_ = true; 1881 sent_settings_ = true;
1922 QueueFrame(settings_frame.release(), HIGHEST); 1882 EnqueueSessionWrite(HIGHEST, settings_frame.Pass());
1923 } 1883 }
1924 1884
1925 void SpdySession::HandleSetting(uint32 id, uint32 value) { 1885 void SpdySession::HandleSetting(uint32 id, uint32 value) {
1926 switch (id) { 1886 switch (id) {
1927 case SETTINGS_MAX_CONCURRENT_STREAMS: 1887 case SETTINGS_MAX_CONCURRENT_STREAMS:
1928 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 1888 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
1929 kMaxConcurrentStreamLimit); 1889 kMaxConcurrentStreamLimit);
1930 ProcessPendingStreamRequests(); 1890 ProcessPendingStreamRequests();
1931 break; 1891 break;
1932 case SETTINGS_INITIAL_WINDOW_SIZE: { 1892 case SETTINGS_INITIAL_WINDOW_SIZE: {
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
2000 } 1960 }
2001 1961
2002 net_log_.AddEvent( 1962 net_log_.AddEvent(
2003 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 1963 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2004 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 1964 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2005 stream_id, delta_window_size)); 1965 stream_id, delta_window_size));
2006 1966
2007 DCHECK(buffered_spdy_framer_.get()); 1967 DCHECK(buffered_spdy_framer_.get());
2008 scoped_ptr<SpdyFrame> window_update_frame( 1968 scoped_ptr<SpdyFrame> window_update_frame(
2009 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 1969 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2010 QueueFrame(window_update_frame.release(), priority); 1970 EnqueueSessionWrite(priority, window_update_frame.Pass());
2011 } 1971 }
2012 1972
2013 void SpdySession::WritePingFrame(uint32 unique_id) { 1973 void SpdySession::WritePingFrame(uint32 unique_id) {
2014 DCHECK(buffered_spdy_framer_.get()); 1974 DCHECK(buffered_spdy_framer_.get());
2015 scoped_ptr<SpdyFrame> ping_frame( 1975 scoped_ptr<SpdyFrame> ping_frame(
2016 buffered_spdy_framer_->CreatePingFrame(unique_id)); 1976 buffered_spdy_framer_->CreatePingFrame(unique_id));
2017 QueueFrame(ping_frame.release(), HIGHEST); 1977 EnqueueSessionWrite(HIGHEST, ping_frame.Pass());
2018 1978
2019 if (net_log().IsLoggingAllEvents()) { 1979 if (net_log().IsLoggingAllEvents()) {
2020 net_log().AddEvent( 1980 net_log().AddEvent(
2021 NetLog::TYPE_SPDY_SESSION_PING, 1981 NetLog::TYPE_SPDY_SESSION_PING,
2022 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); 1982 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent"));
2023 } 1983 }
2024 if (unique_id % 2 != 0) { 1984 if (unique_id % 2 != 0) {
2025 next_ping_id_ += 2; 1985 next_ping_id_ += 2;
2026 ++pings_in_flight_; 1986 ++pings_in_flight_;
2027 PlanToCheckPingStatus(); 1987 PlanToCheckPingStatus();
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after
2302 } 2262 }
2303 2263
2304 session_recv_window_size_ -= delta_window_size; 2264 session_recv_window_size_ -= delta_window_size;
2305 net_log_.AddEvent( 2265 net_log_.AddEvent(
2306 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2266 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2307 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2267 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2308 -delta_window_size, session_recv_window_size_)); 2268 -delta_window_size, session_recv_window_size_));
2309 } 2269 }
2310 2270
2311 } // namespace net 2271 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698