OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/spdy/spdy_session.h" | 5 #include "net/spdy/spdy_session.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <map> | 8 #include <map> |
9 | 9 |
10 #include "base/basictypes.h" | 10 #include "base/basictypes.h" |
(...skipping 12 matching lines...) Expand all Loading... |
23 #include "base/utf_string_conversions.h" | 23 #include "base/utf_string_conversions.h" |
24 #include "base/values.h" | 24 #include "base/values.h" |
25 #include "crypto/ec_private_key.h" | 25 #include "crypto/ec_private_key.h" |
26 #include "crypto/ec_signature_creator.h" | 26 #include "crypto/ec_signature_creator.h" |
27 #include "net/base/connection_type_histograms.h" | 27 #include "net/base/connection_type_histograms.h" |
28 #include "net/base/net_log.h" | 28 #include "net/base/net_log.h" |
29 #include "net/base/net_util.h" | 29 #include "net/base/net_util.h" |
30 #include "net/cert/asn1_util.h" | 30 #include "net/cert/asn1_util.h" |
31 #include "net/http/http_network_session.h" | 31 #include "net/http/http_network_session.h" |
32 #include "net/http/http_server_properties.h" | 32 #include "net/http/http_server_properties.h" |
| 33 #include "net/spdy/spdy_buffer_producer.h" |
33 #include "net/spdy/spdy_credential_builder.h" | 34 #include "net/spdy/spdy_credential_builder.h" |
34 #include "net/spdy/spdy_frame_builder.h" | 35 #include "net/spdy/spdy_frame_builder.h" |
35 #include "net/spdy/spdy_frame_producer.h" | |
36 #include "net/spdy/spdy_http_utils.h" | 36 #include "net/spdy/spdy_http_utils.h" |
37 #include "net/spdy/spdy_protocol.h" | 37 #include "net/spdy/spdy_protocol.h" |
38 #include "net/spdy/spdy_session_pool.h" | 38 #include "net/spdy/spdy_session_pool.h" |
39 #include "net/spdy/spdy_stream.h" | 39 #include "net/spdy/spdy_stream.h" |
40 #include "net/ssl/server_bound_cert_service.h" | 40 #include "net/ssl/server_bound_cert_service.h" |
41 | 41 |
42 namespace net { | 42 namespace net { |
43 | 43 |
44 namespace { | 44 namespace { |
45 | 45 |
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
305 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), | 305 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), |
306 host_port_proxy_pair_(host_port_proxy_pair), | 306 host_port_proxy_pair_(host_port_proxy_pair), |
307 spdy_session_pool_(spdy_session_pool), | 307 spdy_session_pool_(spdy_session_pool), |
308 http_server_properties_(http_server_properties), | 308 http_server_properties_(http_server_properties), |
309 connection_(new ClientSocketHandle), | 309 connection_(new ClientSocketHandle), |
310 read_buffer_(new IOBuffer(kReadBufferSize)), | 310 read_buffer_(new IOBuffer(kReadBufferSize)), |
311 read_pending_(false), | 311 read_pending_(false), |
312 stream_hi_water_mark_(kFirstStreamId), | 312 stream_hi_water_mark_(kFirstStreamId), |
313 write_pending_(false), | 313 write_pending_(false), |
314 in_flight_write_frame_type_(DATA), | 314 in_flight_write_frame_type_(DATA), |
| 315 in_flight_write_frame_size_(0), |
315 delayed_write_pending_(false), | 316 delayed_write_pending_(false), |
316 is_secure_(false), | 317 is_secure_(false), |
317 certificate_error_code_(OK), | 318 certificate_error_code_(OK), |
318 error_(OK), | 319 error_(OK), |
319 state_(IDLE), | 320 state_(IDLE), |
320 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? | 321 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? |
321 kInitialMaxConcurrentStreams : | 322 kInitialMaxConcurrentStreams : |
322 initial_max_concurrent_streams), | 323 initial_max_concurrent_streams), |
323 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? | 324 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? |
324 kMaxConcurrentStreamLimit : | 325 kMaxConcurrentStreamLimit : |
(...skipping 303 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
628 } | 629 } |
629 | 630 |
630 int SpdySession::GetProtocolVersion() const { | 631 int SpdySession::GetProtocolVersion() const { |
631 DCHECK(buffered_spdy_framer_.get()); | 632 DCHECK(buffered_spdy_framer_.get()); |
632 return buffered_spdy_framer_->protocol_version(); | 633 return buffered_spdy_framer_->protocol_version(); |
633 } | 634 } |
634 | 635 |
635 void SpdySession::EnqueueStreamWrite( | 636 void SpdySession::EnqueueStreamWrite( |
636 SpdyStream* stream, | 637 SpdyStream* stream, |
637 SpdyFrameType frame_type, | 638 SpdyFrameType frame_type, |
638 scoped_ptr<SpdyFrameProducer> producer) { | 639 scoped_ptr<SpdyBufferProducer> producer) { |
639 DCHECK(frame_type == HEADERS || | 640 DCHECK(frame_type == HEADERS || |
640 frame_type == DATA || | 641 frame_type == DATA || |
641 frame_type == CREDENTIAL || | 642 frame_type == CREDENTIAL || |
642 frame_type == SYN_STREAM); | 643 frame_type == SYN_STREAM); |
643 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); | 644 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); |
644 } | 645 } |
645 | 646 |
646 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( | 647 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( |
647 SpdyStreamId stream_id, | 648 SpdyStreamId stream_id, |
648 RequestPriority priority, | 649 RequestPriority priority, |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
730 bool fin = flags & CONTROL_FLAG_FIN; | 731 bool fin = flags & CONTROL_FLAG_FIN; |
731 net_log().AddEvent( | 732 net_log().AddEvent( |
732 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, | 733 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, |
733 base::Bind(&NetLogSpdySynCallback, | 734 base::Bind(&NetLogSpdySynCallback, |
734 &headers, fin, /*unidirectional=*/false, | 735 &headers, fin, /*unidirectional=*/false, |
735 stream_id, 0)); | 736 stream_id, 0)); |
736 } | 737 } |
737 return frame.Pass(); | 738 return frame.Pass(); |
738 } | 739 } |
739 | 740 |
740 scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, | 741 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, |
741 net::IOBuffer* data, | 742 net::IOBuffer* data, |
742 int len, | 743 int len, |
743 SpdyDataFlags flags) { | 744 SpdyDataFlags flags) { |
744 // Find our stream | 745 // Find our stream. |
745 CHECK(IsStreamActive(stream_id)); | 746 CHECK(IsStreamActive(stream_id)); |
746 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; | 747 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
747 CHECK_EQ(stream->stream_id(), stream_id); | 748 CHECK_EQ(stream->stream_id(), stream_id); |
748 | 749 |
749 if (len < 0) { | 750 if (len < 0) { |
750 NOTREACHED(); | 751 NOTREACHED(); |
751 return scoped_ptr<SpdyFrame>(); | 752 return scoped_ptr<SpdyBuffer>(); |
752 } | 753 } |
753 | 754 |
754 if (len > kMaxSpdyFrameChunkSize) { | 755 if (len > kMaxSpdyFrameChunkSize) { |
755 len = kMaxSpdyFrameChunkSize; | 756 len = kMaxSpdyFrameChunkSize; |
756 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); | 757 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); |
757 } | 758 } |
758 | 759 |
759 // Obey send window size of the stream (and session, if applicable) | 760 // Obey send window size of the stream (and session, if applicable) |
760 // if flow control is enabled. | 761 // if flow control is enabled. |
761 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { | 762 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { |
762 int32 effective_window_size = stream->send_window_size(); | 763 int32 effective_window_size = stream->send_window_size(); |
763 if (effective_window_size <= 0) { | 764 if (effective_window_size <= 0) { |
764 // Because we queue frames onto the session, it is possible that | 765 // Because we queue frames onto the session, it is possible that |
765 // a stream was not flow controlled at the time it attempted the | 766 // a stream was not flow controlled at the time it attempted the |
766 // write, but when we go to fulfill the write, it is now flow | 767 // write, but when we go to fulfill the write, it is now flow |
767 // controlled. This is why we need the session to mark the stream | 768 // controlled. This is why we need the session to mark the stream |
768 // as stalled - because only the session knows for sure when the | 769 // as stalled - because only the session knows for sure when the |
769 // stall occurs. | 770 // stall occurs. |
770 stream->set_send_stalled_by_flow_control(true); | 771 stream->set_send_stalled_by_flow_control(true); |
771 net_log().AddEvent( | 772 net_log().AddEvent( |
772 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, | 773 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, |
773 NetLog::IntegerCallback("stream_id", stream_id)); | 774 NetLog::IntegerCallback("stream_id", stream_id)); |
774 return scoped_ptr<SpdyFrame>(); | 775 return scoped_ptr<SpdyBuffer>(); |
775 } | 776 } |
776 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { | 777 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { |
777 effective_window_size = | 778 effective_window_size = |
778 std::min(effective_window_size, session_send_window_size_); | 779 std::min(effective_window_size, session_send_window_size_); |
779 if (effective_window_size <= 0) { | 780 if (effective_window_size <= 0) { |
780 DCHECK(IsSendStalled()); | 781 DCHECK(IsSendStalled()); |
781 stream->set_send_stalled_by_flow_control(true); | 782 stream->set_send_stalled_by_flow_control(true); |
782 QueueSendStalledStream(stream); | 783 QueueSendStalledStream(stream); |
783 net_log().AddEvent( | 784 net_log().AddEvent( |
784 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, | 785 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, |
785 NetLog::IntegerCallback("stream_id", stream_id)); | 786 NetLog::IntegerCallback("stream_id", stream_id)); |
786 return scoped_ptr<SpdyFrame>(); | 787 return scoped_ptr<SpdyBuffer>(); |
787 } | 788 } |
788 } | 789 } |
789 | 790 |
790 int new_len = std::min(len, effective_window_size); | 791 int new_len = std::min(len, effective_window_size); |
791 if (new_len < len) { | 792 if (new_len < len) { |
792 len = new_len; | 793 len = new_len; |
793 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); | 794 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); |
794 } | 795 } |
795 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) | 796 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) |
796 DecreaseSendWindowSize(static_cast<int32>(len)); | 797 DecreaseSendWindowSize(static_cast<int32>(len)); |
(...skipping 10 matching lines...) Expand all Loading... |
807 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. | 808 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. |
808 if (len > 0) | 809 if (len > 0) |
809 SendPrefacePingIfNoneInFlight(); | 810 SendPrefacePingIfNoneInFlight(); |
810 | 811 |
811 // TODO(mbelshe): reduce memory copies here. | 812 // TODO(mbelshe): reduce memory copies here. |
812 DCHECK(buffered_spdy_framer_.get()); | 813 DCHECK(buffered_spdy_framer_.get()); |
813 scoped_ptr<SpdyFrame> frame( | 814 scoped_ptr<SpdyFrame> frame( |
814 buffered_spdy_framer_->CreateDataFrame( | 815 buffered_spdy_framer_->CreateDataFrame( |
815 stream_id, data->data(), static_cast<uint32>(len), flags)); | 816 stream_id, data->data(), static_cast<uint32>(len), flags)); |
816 | 817 |
817 return frame.Pass(); | 818 return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); |
818 } | 819 } |
819 | 820 |
820 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { | 821 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
821 DCHECK_NE(0u, stream_id); | 822 DCHECK_NE(0u, stream_id); |
822 // TODO(mbelshe): We should send a RST_STREAM control frame here | 823 // TODO(mbelshe): We should send a RST_STREAM control frame here |
823 // so that the server can cancel a large send. | 824 // so that the server can cancel a large send. |
824 | 825 |
825 DeleteStream(stream_id, status); | 826 DeleteStream(stream_id, status); |
826 } | 827 } |
827 | 828 |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
915 ReadSocket(); | 916 ReadSocket(); |
916 } | 917 } |
917 | 918 |
918 void SpdySession::OnWriteComplete(int result) { | 919 void SpdySession::OnWriteComplete(int result) { |
919 // Releasing the in-flight write can have a side-effect of dropping | 920 // Releasing the in-flight write can have a side-effect of dropping |
920 // the last reference to |this|. Hold a reference through this | 921 // the last reference to |this|. Hold a reference through this |
921 // function. | 922 // function. |
922 scoped_refptr<SpdySession> self(this); | 923 scoped_refptr<SpdySession> self(this); |
923 | 924 |
924 DCHECK(write_pending_); | 925 DCHECK(write_pending_); |
925 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); | 926 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
926 | 927 |
927 last_activity_time_ = base::TimeTicks::Now(); | 928 last_activity_time_ = base::TimeTicks::Now(); |
928 write_pending_ = false; | 929 write_pending_ = false; |
929 | 930 |
930 if (result < 0) { | 931 if (result < 0) { |
931 in_flight_write_.Release(); | 932 in_flight_write_.reset(); |
932 in_flight_write_frame_type_ = DATA; | 933 in_flight_write_frame_type_ = DATA; |
933 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); | 934 in_flight_write_frame_size_ = 0; |
| 935 in_flight_write_stream_ = NULL; |
| 936 CloseSessionOnError(static_cast<Error>(result), true, "Write error"); |
934 return; | 937 return; |
935 } | 938 } |
936 | 939 |
937 // It should not be possible to have written more bytes than our | 940 // It should not be possible to have written more bytes than our |
938 // in_flight_write_. | 941 // in_flight_write_. |
939 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); | 942 DCHECK_LE(static_cast<size_t>(result), |
| 943 in_flight_write_->GetRemainingSize()); |
940 | 944 |
941 in_flight_write_.buffer()->DidConsume(result); | 945 if (result > 0) { |
| 946 in_flight_write_->Consume(static_cast<size_t>(result)); |
942 | 947 |
943 // We only notify the stream when we've fully written the pending frame. | 948 // We only notify the stream when we've fully written the pending frame. |
944 if (in_flight_write_.buffer()->BytesRemaining() == 0) { | 949 if (in_flight_write_->GetRemainingSize() == 0) { |
945 DCHECK_GT(result, 0); | 950 // It is possible that the stream was cancelled while we were |
| 951 // writing to the socket. |
| 952 if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) { |
| 953 DCHECK_GT(in_flight_write_frame_size_, 0u); |
| 954 in_flight_write_stream_->OnFrameWriteComplete( |
| 955 in_flight_write_frame_type_, |
| 956 in_flight_write_frame_size_); |
| 957 } |
946 | 958 |
947 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); | 959 // Cleanup the write which just completed. |
948 | 960 in_flight_write_.reset(); |
949 // It is possible that the stream was cancelled while we were writing | 961 in_flight_write_frame_type_ = DATA; |
950 // to the socket. | 962 in_flight_write_frame_size_ = 0; |
951 if (stream && !stream->cancelled()) { | 963 in_flight_write_stream_ = NULL; |
952 DCHECK_GT(in_flight_write_.buffer()->size(), 0); | |
953 stream->OnFrameWriteComplete( | |
954 in_flight_write_frame_type_, | |
955 static_cast<size_t>(in_flight_write_.buffer()->size())); | |
956 } | 964 } |
957 | |
958 // Cleanup the write which just completed. | |
959 in_flight_write_.Release(); | |
960 in_flight_write_frame_type_ = DATA; | |
961 } | 965 } |
962 | 966 |
963 // Write more data. We're already in a continuation, so we can go | 967 // Write more data. We're already in a continuation, so we can go |
964 // ahead and write it immediately (without going back to the message | 968 // ahead and write it immediately (without going back to the message |
965 // loop). | 969 // loop). |
966 WriteSocketLater(); | 970 WriteSocketLater(); |
967 } | 971 } |
968 | 972 |
969 net::Error SpdySession::ReadSocket() { | 973 net::Error SpdySession::ReadSocket() { |
970 if (read_pending_) | 974 if (read_pending_) |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1028 if (state_ < CONNECTED || state_ == CLOSED) | 1032 if (state_ < CONNECTED || state_ == CLOSED) |
1029 return; | 1033 return; |
1030 | 1034 |
1031 if (write_pending_) // Another write is in progress still. | 1035 if (write_pending_) // Another write is in progress still. |
1032 return; | 1036 return; |
1033 | 1037 |
1034 // Loop sending frames until we've sent everything or until the write | 1038 // Loop sending frames until we've sent everything or until the write |
1035 // returns error (or ERR_IO_PENDING). | 1039 // returns error (or ERR_IO_PENDING). |
1036 DCHECK(buffered_spdy_framer_.get()); | 1040 DCHECK(buffered_spdy_framer_.get()); |
1037 while (true) { | 1041 while (true) { |
1038 if (in_flight_write_.buffer()) { | 1042 if (in_flight_write_) { |
1039 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); | 1043 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
1040 } else { | 1044 } else { |
1041 // Grab the next frame to send. | 1045 // Grab the next frame to send. |
1042 SpdyFrameType frame_type = DATA; | 1046 SpdyFrameType frame_type = DATA; |
1043 scoped_ptr<SpdyFrameProducer> producer; | 1047 scoped_ptr<SpdyBufferProducer> producer; |
1044 scoped_refptr<SpdyStream> stream; | 1048 scoped_refptr<SpdyStream> stream; |
1045 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) | 1049 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) |
1046 break; | 1050 break; |
1047 | 1051 |
1048 // It is possible that a stream had data to write, but a | 1052 // It is possible that a stream had data to write, but a |
1049 // WINDOW_UPDATE frame has been received which made that | 1053 // WINDOW_UPDATE frame has been received which made that |
1050 // stream no longer writable. | 1054 // stream no longer writable. |
1051 // TODO(rch): consider handling that case by removing the | 1055 // TODO(rch): consider handling that case by removing the |
1052 // stream from the writable queue? | 1056 // stream from the writable queue? |
1053 if (stream.get() && stream->cancelled()) | 1057 if (stream.get() && stream->cancelled()) |
1054 continue; | 1058 continue; |
1055 | 1059 |
1056 // Activate the stream only when sending the SYN_STREAM frame to | 1060 // Activate the stream only when sending the SYN_STREAM frame to |
1057 // guarantee monotonically-increasing stream IDs. | 1061 // guarantee monotonically-increasing stream IDs. |
1058 if (frame_type == SYN_STREAM) { | 1062 if (frame_type == SYN_STREAM) { |
1059 if (stream.get() && stream->stream_id() == 0) { | 1063 if (stream.get() && stream->stream_id() == 0) { |
1060 ActivateStream(stream); | 1064 ActivateStream(stream); |
1061 } else { | 1065 } else { |
1062 NOTREACHED(); | 1066 NOTREACHED(); |
1063 continue; | 1067 continue; |
1064 } | 1068 } |
1065 } | 1069 } |
1066 | 1070 |
1067 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); | 1071 in_flight_write_ = producer->ProduceBuffer(); |
1068 if (!frame) { | 1072 if (!in_flight_write_) { |
1069 NOTREACHED(); | 1073 NOTREACHED(); |
1070 continue; | 1074 continue; |
1071 } | 1075 } |
1072 DCHECK_GT(frame->size(), 0u); | |
1073 | |
1074 // TODO(mbelshe): We have too much copying of data here. | |
1075 scoped_refptr<IOBufferWithSize> buffer = | |
1076 new IOBufferWithSize(frame->size()); | |
1077 memcpy(buffer->data(), frame->data(), frame->size()); | |
1078 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); | |
1079 in_flight_write_frame_type_ = frame_type; | 1076 in_flight_write_frame_type_ = frame_type; |
| 1077 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); |
| 1078 DCHECK_GE(in_flight_write_frame_size_, |
| 1079 buffered_spdy_framer_->GetFrameMinimumSize()); |
| 1080 in_flight_write_stream_ = stream; |
1080 } | 1081 } |
1081 | 1082 |
1082 write_pending_ = true; | 1083 write_pending_ = true; |
1083 int rv = connection_->socket()->Write( | 1084 int rv = connection_->socket()->Write( |
1084 in_flight_write_.buffer(), | 1085 in_flight_write_->GetIOBufferForRemainingData(), |
1085 in_flight_write_.buffer()->BytesRemaining(), | 1086 in_flight_write_->GetRemainingSize(), |
1086 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); | 1087 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); |
1087 if (rv == net::ERR_IO_PENDING) | 1088 if (rv == net::ERR_IO_PENDING) |
1088 break; | 1089 break; |
1089 | 1090 |
1090 // We sent the frame successfully. | 1091 // We sent the frame successfully. |
1091 OnWriteComplete(rv); | 1092 OnWriteComplete(rv); |
1092 | 1093 |
1093 // TODO(mbelshe): Test this error case. Maybe we should mark the socket | 1094 // TODO(mbelshe): Test this error case. Maybe we should mark the socket |
1094 // as in an error state. | 1095 // as in an error state. |
1095 if (rv < 0) | 1096 if (rv < 0) |
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1262 | 1263 |
1263 void SpdySession::EnqueueSessionWrite(RequestPriority priority, | 1264 void SpdySession::EnqueueSessionWrite(RequestPriority priority, |
1264 SpdyFrameType frame_type, | 1265 SpdyFrameType frame_type, |
1265 scoped_ptr<SpdyFrame> frame) { | 1266 scoped_ptr<SpdyFrame> frame) { |
1266 DCHECK(frame_type == RST_STREAM || | 1267 DCHECK(frame_type == RST_STREAM || |
1267 frame_type == SETTINGS || | 1268 frame_type == SETTINGS || |
1268 frame_type == WINDOW_UPDATE || | 1269 frame_type == WINDOW_UPDATE || |
1269 frame_type == PING); | 1270 frame_type == PING); |
1270 EnqueueWrite( | 1271 EnqueueWrite( |
1271 priority, frame_type, | 1272 priority, frame_type, |
1272 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), | 1273 scoped_ptr<SpdyBufferProducer>( |
| 1274 new SimpleBufferProducer( |
| 1275 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), |
1273 NULL); | 1276 NULL); |
1274 } | 1277 } |
1275 | 1278 |
1276 void SpdySession::EnqueueWrite(RequestPriority priority, | 1279 void SpdySession::EnqueueWrite(RequestPriority priority, |
1277 SpdyFrameType frame_type, | 1280 SpdyFrameType frame_type, |
1278 scoped_ptr<SpdyFrameProducer> producer, | 1281 scoped_ptr<SpdyBufferProducer> producer, |
1279 const scoped_refptr<SpdyStream>& stream) { | 1282 const scoped_refptr<SpdyStream>& stream) { |
1280 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); | 1283 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); |
1281 WriteSocketLater(); | 1284 WriteSocketLater(); |
1282 } | 1285 } |
1283 | 1286 |
1284 void SpdySession::ActivateStream(SpdyStream* stream) { | 1287 void SpdySession::ActivateStream(SpdyStream* stream) { |
1285 if (stream->stream_id() == 0) { | 1288 if (stream->stream_id() == 0) { |
1286 stream->set_stream_id(GetNewStreamId()); | 1289 stream->set_stream_id(GetNewStreamId()); |
1287 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); | 1290 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
1288 } | 1291 } |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1386 const char* data, | 1389 const char* data, |
1387 size_t len, | 1390 size_t len, |
1388 bool fin) { | 1391 bool fin) { |
1389 DCHECK_LT(len, 1u << 24); | 1392 DCHECK_LT(len, 1u << 24); |
1390 if (net_log().IsLoggingAllEvents()) { | 1393 if (net_log().IsLoggingAllEvents()) { |
1391 net_log().AddEvent( | 1394 net_log().AddEvent( |
1392 NetLog::TYPE_SPDY_SESSION_RECV_DATA, | 1395 NetLog::TYPE_SPDY_SESSION_RECV_DATA, |
1393 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); | 1396 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); |
1394 } | 1397 } |
1395 | 1398 |
| 1399 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); |
| 1400 |
1396 // By the time data comes in, the stream may already be inactive. | 1401 // By the time data comes in, the stream may already be inactive. |
1397 if (!IsStreamActive(stream_id)) | 1402 if (it == active_streams_.end()) |
1398 return; | 1403 return; |
1399 | 1404 |
1400 // Only decrease the window size for data for active streams. | 1405 // Only decrease the window size for data for active streams. |
1401 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) | 1406 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) |
1402 DecreaseRecvWindowSize(static_cast<int32>(len)); | 1407 DecreaseRecvWindowSize(static_cast<int32>(len)); |
1403 | 1408 |
1404 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; | 1409 scoped_ptr<SpdyBuffer> buffer; |
1405 stream->OnDataReceived(data, len); | 1410 if (data) { |
| 1411 DCHECK_GT(len, 0u); |
| 1412 buffer.reset(new SpdyBuffer(data, len)); |
| 1413 } else { |
| 1414 DCHECK_EQ(len, 0u); |
| 1415 } |
| 1416 it->second->OnDataReceived(buffer.Pass()); |
1406 } | 1417 } |
1407 | 1418 |
1408 void SpdySession::OnSetting(SpdySettingsIds id, | 1419 void SpdySession::OnSetting(SpdySettingsIds id, |
1409 uint8 flags, | 1420 uint8 flags, |
1410 uint32 value) { | 1421 uint32 value) { |
1411 HandleSetting(id, value); | 1422 HandleSetting(id, value); |
1412 http_server_properties_->SetSpdySetting( | 1423 http_server_properties_->SetSpdySetting( |
1413 host_port_pair(), | 1424 host_port_pair(), |
1414 id, | 1425 id, |
1415 static_cast<SpdySettingsFlags>(flags), | 1426 static_cast<SpdySettingsFlags>(flags), |
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1664 if (!IsStreamActive(stream_id)) { | 1675 if (!IsStreamActive(stream_id)) { |
1665 // NOTE: it may just be that the stream was cancelled. | 1676 // NOTE: it may just be that the stream was cancelled. |
1666 LOG(WARNING) << "Received RST for invalid stream" << stream_id; | 1677 LOG(WARNING) << "Received RST for invalid stream" << stream_id; |
1667 return; | 1678 return; |
1668 } | 1679 } |
1669 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; | 1680 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
1670 CHECK_EQ(stream->stream_id(), stream_id); | 1681 CHECK_EQ(stream->stream_id(), stream_id); |
1671 CHECK(!stream->cancelled()); | 1682 CHECK(!stream->cancelled()); |
1672 | 1683 |
1673 if (status == 0) { | 1684 if (status == 0) { |
1674 stream->OnDataReceived(NULL, 0); | 1685 stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); |
1675 } else if (status == RST_STREAM_REFUSED_STREAM) { | 1686 } else if (status == RST_STREAM_REFUSED_STREAM) { |
1676 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); | 1687 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); |
1677 } else { | 1688 } else { |
1678 RecordProtocolErrorHistogram( | 1689 RecordProtocolErrorHistogram( |
1679 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); | 1690 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); |
1680 stream->LogStreamError( | 1691 stream->LogStreamError( |
1681 ERR_SPDY_PROTOCOL_ERROR, | 1692 ERR_SPDY_PROTOCOL_ERROR, |
1682 base::StringPrintf("SPDY stream closed with status: %d", status)); | 1693 base::StringPrintf("SPDY stream closed with status: %d", status)); |
1683 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. | 1694 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. |
1684 // For now, it doesn't matter much - it is a protocol error. | 1695 // For now, it doesn't matter much - it is a protocol error. |
(...skipping 600 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2285 } | 2296 } |
2286 | 2297 |
2287 session_recv_window_size_ -= delta_window_size; | 2298 session_recv_window_size_ -= delta_window_size; |
2288 net_log_.AddEvent( | 2299 net_log_.AddEvent( |
2289 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, | 2300 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, |
2290 base::Bind(&NetLogSpdySessionWindowUpdateCallback, | 2301 base::Bind(&NetLogSpdySessionWindowUpdateCallback, |
2291 -delta_window_size, session_recv_window_size_)); | 2302 -delta_window_size, session_recv_window_size_)); |
2292 } | 2303 } |
2293 | 2304 |
2294 } // namespace net | 2305 } // namespace net |
OLD | NEW |