Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 13990005: [SPDY] Replace SpdyIOBuffer with new SpdyBuffer class (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 12 matching lines...) Expand all
23 #include "base/utf_string_conversions.h" 23 #include "base/utf_string_conversions.h"
24 #include "base/values.h" 24 #include "base/values.h"
25 #include "crypto/ec_private_key.h" 25 #include "crypto/ec_private_key.h"
26 #include "crypto/ec_signature_creator.h" 26 #include "crypto/ec_signature_creator.h"
27 #include "net/base/connection_type_histograms.h" 27 #include "net/base/connection_type_histograms.h"
28 #include "net/base/net_log.h" 28 #include "net/base/net_log.h"
29 #include "net/base/net_util.h" 29 #include "net/base/net_util.h"
30 #include "net/cert/asn1_util.h" 30 #include "net/cert/asn1_util.h"
31 #include "net/http/http_network_session.h" 31 #include "net/http/http_network_session.h"
32 #include "net/http/http_server_properties.h" 32 #include "net/http/http_server_properties.h"
33 #include "net/spdy/spdy_buffer_producer.h"
33 #include "net/spdy/spdy_credential_builder.h" 34 #include "net/spdy/spdy_credential_builder.h"
34 #include "net/spdy/spdy_frame_builder.h" 35 #include "net/spdy/spdy_frame_builder.h"
35 #include "net/spdy/spdy_frame_producer.h"
36 #include "net/spdy/spdy_http_utils.h" 36 #include "net/spdy/spdy_http_utils.h"
37 #include "net/spdy/spdy_protocol.h" 37 #include "net/spdy/spdy_protocol.h"
38 #include "net/spdy/spdy_session_pool.h" 38 #include "net/spdy/spdy_session_pool.h"
39 #include "net/spdy/spdy_stream.h" 39 #include "net/spdy/spdy_stream.h"
40 #include "net/ssl/server_bound_cert_service.h" 40 #include "net/ssl/server_bound_cert_service.h"
41 41
42 namespace net { 42 namespace net {
43 43
44 namespace { 44 namespace {
45 45
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), 305 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
306 host_port_proxy_pair_(host_port_proxy_pair), 306 host_port_proxy_pair_(host_port_proxy_pair),
307 spdy_session_pool_(spdy_session_pool), 307 spdy_session_pool_(spdy_session_pool),
308 http_server_properties_(http_server_properties), 308 http_server_properties_(http_server_properties),
309 connection_(new ClientSocketHandle), 309 connection_(new ClientSocketHandle),
310 read_buffer_(new IOBuffer(kReadBufferSize)), 310 read_buffer_(new IOBuffer(kReadBufferSize)),
311 read_pending_(false), 311 read_pending_(false),
312 stream_hi_water_mark_(kFirstStreamId), 312 stream_hi_water_mark_(kFirstStreamId),
313 write_pending_(false), 313 write_pending_(false),
314 in_flight_write_frame_type_(DATA), 314 in_flight_write_frame_type_(DATA),
315 in_flight_write_frame_size_(0),
315 delayed_write_pending_(false), 316 delayed_write_pending_(false),
316 is_secure_(false), 317 is_secure_(false),
317 certificate_error_code_(OK), 318 certificate_error_code_(OK),
318 error_(OK), 319 error_(OK),
319 state_(IDLE), 320 state_(IDLE),
320 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 321 max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
321 kInitialMaxConcurrentStreams : 322 kInitialMaxConcurrentStreams :
322 initial_max_concurrent_streams), 323 initial_max_concurrent_streams),
323 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 324 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
324 kMaxConcurrentStreamLimit : 325 kMaxConcurrentStreamLimit :
(...skipping 303 matching lines...) Expand 10 before | Expand all | Expand 10 after
628 } 629 }
629 630
630 int SpdySession::GetProtocolVersion() const { 631 int SpdySession::GetProtocolVersion() const {
631 DCHECK(buffered_spdy_framer_.get()); 632 DCHECK(buffered_spdy_framer_.get());
632 return buffered_spdy_framer_->protocol_version(); 633 return buffered_spdy_framer_->protocol_version();
633 } 634 }
634 635
635 void SpdySession::EnqueueStreamWrite( 636 void SpdySession::EnqueueStreamWrite(
636 SpdyStream* stream, 637 SpdyStream* stream,
637 SpdyFrameType frame_type, 638 SpdyFrameType frame_type,
638 scoped_ptr<SpdyFrameProducer> producer) { 639 scoped_ptr<SpdyBufferProducer> producer) {
639 DCHECK(frame_type == HEADERS || 640 DCHECK(frame_type == HEADERS ||
640 frame_type == DATA || 641 frame_type == DATA ||
641 frame_type == CREDENTIAL || 642 frame_type == CREDENTIAL ||
642 frame_type == SYN_STREAM); 643 frame_type == SYN_STREAM);
643 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 644 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
644 } 645 }
645 646
646 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 647 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
647 SpdyStreamId stream_id, 648 SpdyStreamId stream_id,
648 RequestPriority priority, 649 RequestPriority priority,
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
730 bool fin = flags & CONTROL_FLAG_FIN; 731 bool fin = flags & CONTROL_FLAG_FIN;
731 net_log().AddEvent( 732 net_log().AddEvent(
732 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, 733 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS,
733 base::Bind(&NetLogSpdySynCallback, 734 base::Bind(&NetLogSpdySynCallback,
734 &headers, fin, /*unidirectional=*/false, 735 &headers, fin, /*unidirectional=*/false,
735 stream_id, 0)); 736 stream_id, 0));
736 } 737 }
737 return frame.Pass(); 738 return frame.Pass();
738 } 739 }
739 740
740 scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, 741 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
741 net::IOBuffer* data, 742 net::IOBuffer* data,
742 int len, 743 int len,
743 SpdyDataFlags flags) { 744 SpdyDataFlags flags) {
744 // Find our stream 745 // Find our stream.
745 CHECK(IsStreamActive(stream_id)); 746 CHECK(IsStreamActive(stream_id));
746 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 747 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
747 CHECK_EQ(stream->stream_id(), stream_id); 748 CHECK_EQ(stream->stream_id(), stream_id);
748 749
749 if (len < 0) { 750 if (len < 0) {
750 NOTREACHED(); 751 NOTREACHED();
751 return scoped_ptr<SpdyFrame>(); 752 return scoped_ptr<SpdyBuffer>();
752 } 753 }
753 754
754 if (len > kMaxSpdyFrameChunkSize) { 755 if (len > kMaxSpdyFrameChunkSize) {
755 len = kMaxSpdyFrameChunkSize; 756 len = kMaxSpdyFrameChunkSize;
756 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 757 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
757 } 758 }
758 759
759 // Obey send window size of the stream (and session, if applicable) 760 // Obey send window size of the stream (and session, if applicable)
760 // if flow control is enabled. 761 // if flow control is enabled.
761 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 762 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
762 int32 effective_window_size = stream->send_window_size(); 763 int32 effective_window_size = stream->send_window_size();
763 if (effective_window_size <= 0) { 764 if (effective_window_size <= 0) {
764 // Because we queue frames onto the session, it is possible that 765 // Because we queue frames onto the session, it is possible that
765 // a stream was not flow controlled at the time it attempted the 766 // a stream was not flow controlled at the time it attempted the
766 // write, but when we go to fulfill the write, it is now flow 767 // write, but when we go to fulfill the write, it is now flow
767 // controlled. This is why we need the session to mark the stream 768 // controlled. This is why we need the session to mark the stream
768 // as stalled - because only the session knows for sure when the 769 // as stalled - because only the session knows for sure when the
769 // stall occurs. 770 // stall occurs.
770 stream->set_send_stalled_by_flow_control(true); 771 stream->set_send_stalled_by_flow_control(true);
771 net_log().AddEvent( 772 net_log().AddEvent(
772 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, 773 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
773 NetLog::IntegerCallback("stream_id", stream_id)); 774 NetLog::IntegerCallback("stream_id", stream_id));
774 return scoped_ptr<SpdyFrame>(); 775 return scoped_ptr<SpdyBuffer>();
775 } 776 }
776 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 777 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
777 effective_window_size = 778 effective_window_size =
778 std::min(effective_window_size, session_send_window_size_); 779 std::min(effective_window_size, session_send_window_size_);
779 if (effective_window_size <= 0) { 780 if (effective_window_size <= 0) {
780 DCHECK(IsSendStalled()); 781 DCHECK(IsSendStalled());
781 stream->set_send_stalled_by_flow_control(true); 782 stream->set_send_stalled_by_flow_control(true);
782 QueueSendStalledStream(stream); 783 QueueSendStalledStream(stream);
783 net_log().AddEvent( 784 net_log().AddEvent(
784 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, 785 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
785 NetLog::IntegerCallback("stream_id", stream_id)); 786 NetLog::IntegerCallback("stream_id", stream_id));
786 return scoped_ptr<SpdyFrame>(); 787 return scoped_ptr<SpdyBuffer>();
787 } 788 }
788 } 789 }
789 790
790 int new_len = std::min(len, effective_window_size); 791 int new_len = std::min(len, effective_window_size);
791 if (new_len < len) { 792 if (new_len < len) {
792 len = new_len; 793 len = new_len;
793 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 794 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
794 } 795 }
795 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) 796 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
796 DecreaseSendWindowSize(static_cast<int32>(len)); 797 DecreaseSendWindowSize(static_cast<int32>(len));
(...skipping 10 matching lines...) Expand all
807 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 808 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
808 if (len > 0) 809 if (len > 0)
809 SendPrefacePingIfNoneInFlight(); 810 SendPrefacePingIfNoneInFlight();
810 811
811 // TODO(mbelshe): reduce memory copies here. 812 // TODO(mbelshe): reduce memory copies here.
812 DCHECK(buffered_spdy_framer_.get()); 813 DCHECK(buffered_spdy_framer_.get());
813 scoped_ptr<SpdyFrame> frame( 814 scoped_ptr<SpdyFrame> frame(
814 buffered_spdy_framer_->CreateDataFrame( 815 buffered_spdy_framer_->CreateDataFrame(
815 stream_id, data->data(), static_cast<uint32>(len), flags)); 816 stream_id, data->data(), static_cast<uint32>(len), flags));
816 817
817 return frame.Pass(); 818 return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
818 } 819 }
819 820
820 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { 821 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
821 DCHECK_NE(0u, stream_id); 822 DCHECK_NE(0u, stream_id);
822 // TODO(mbelshe): We should send a RST_STREAM control frame here 823 // TODO(mbelshe): We should send a RST_STREAM control frame here
823 // so that the server can cancel a large send. 824 // so that the server can cancel a large send.
824 825
825 DeleteStream(stream_id, status); 826 DeleteStream(stream_id, status);
826 } 827 }
827 828
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
915 ReadSocket(); 916 ReadSocket();
916 } 917 }
917 918
918 void SpdySession::OnWriteComplete(int result) { 919 void SpdySession::OnWriteComplete(int result) {
919 // Releasing the in-flight write can have a side-effect of dropping 920 // Releasing the in-flight write can have a side-effect of dropping
920 // the last reference to |this|. Hold a reference through this 921 // the last reference to |this|. Hold a reference through this
921 // function. 922 // function.
922 scoped_refptr<SpdySession> self(this); 923 scoped_refptr<SpdySession> self(this);
923 924
924 DCHECK(write_pending_); 925 DCHECK(write_pending_);
925 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); 926 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
926 927
927 last_activity_time_ = base::TimeTicks::Now(); 928 last_activity_time_ = base::TimeTicks::Now();
928 write_pending_ = false; 929 write_pending_ = false;
929 930
930 if (result < 0) { 931 if (result < 0) {
931 in_flight_write_.Release(); 932 in_flight_write_.reset();
932 in_flight_write_frame_type_ = DATA; 933 in_flight_write_frame_type_ = DATA;
933 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); 934 in_flight_write_frame_size_ = 0;
935 in_flight_write_stream_ = NULL;
936 CloseSessionOnError(static_cast<Error>(result), true, "Write error");
934 return; 937 return;
935 } 938 }
936 939
937 // It should not be possible to have written more bytes than our 940 // It should not be possible to have written more bytes than our
938 // in_flight_write_. 941 // in_flight_write_.
939 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); 942 DCHECK_LE(static_cast<size_t>(result),
943 in_flight_write_->GetRemainingSize());
940 944
941 in_flight_write_.buffer()->DidConsume(result); 945 if (result > 0) {
946 in_flight_write_->Consume(static_cast<size_t>(result));
942 947
943 // We only notify the stream when we've fully written the pending frame. 948 // We only notify the stream when we've fully written the pending frame.
944 if (in_flight_write_.buffer()->BytesRemaining() == 0) { 949 if (in_flight_write_->GetRemainingSize() == 0) {
945 DCHECK_GT(result, 0); 950 // It is possible that the stream was cancelled while we were
951 // writing to the socket.
952 if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
953 DCHECK_GT(in_flight_write_frame_size_, 0u);
954 in_flight_write_stream_->OnFrameWriteComplete(
955 in_flight_write_frame_type_,
956 in_flight_write_frame_size_);
957 }
946 958
947 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 959 // Cleanup the write which just completed.
948 960 in_flight_write_.reset();
949 // It is possible that the stream was cancelled while we were writing 961 in_flight_write_frame_type_ = DATA;
950 // to the socket. 962 in_flight_write_frame_size_ = 0;
951 if (stream && !stream->cancelled()) { 963 in_flight_write_stream_ = NULL;
952 DCHECK_GT(in_flight_write_.buffer()->size(), 0);
953 stream->OnFrameWriteComplete(
954 in_flight_write_frame_type_,
955 static_cast<size_t>(in_flight_write_.buffer()->size()));
956 } 964 }
957
958 // Cleanup the write which just completed.
959 in_flight_write_.Release();
960 in_flight_write_frame_type_ = DATA;
961 } 965 }
962 966
963 // Write more data. We're already in a continuation, so we can go 967 // Write more data. We're already in a continuation, so we can go
964 // ahead and write it immediately (without going back to the message 968 // ahead and write it immediately (without going back to the message
965 // loop). 969 // loop).
966 WriteSocketLater(); 970 WriteSocketLater();
967 } 971 }
968 972
969 net::Error SpdySession::ReadSocket() { 973 net::Error SpdySession::ReadSocket() {
970 if (read_pending_) 974 if (read_pending_)
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
1028 if (state_ < CONNECTED || state_ == CLOSED) 1032 if (state_ < CONNECTED || state_ == CLOSED)
1029 return; 1033 return;
1030 1034
1031 if (write_pending_) // Another write is in progress still. 1035 if (write_pending_) // Another write is in progress still.
1032 return; 1036 return;
1033 1037
1034 // Loop sending frames until we've sent everything or until the write 1038 // Loop sending frames until we've sent everything or until the write
1035 // returns error (or ERR_IO_PENDING). 1039 // returns error (or ERR_IO_PENDING).
1036 DCHECK(buffered_spdy_framer_.get()); 1040 DCHECK(buffered_spdy_framer_.get());
1037 while (true) { 1041 while (true) {
1038 if (in_flight_write_.buffer()) { 1042 if (in_flight_write_) {
1039 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); 1043 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1040 } else { 1044 } else {
1041 // Grab the next frame to send. 1045 // Grab the next frame to send.
1042 SpdyFrameType frame_type = DATA; 1046 SpdyFrameType frame_type = DATA;
1043 scoped_ptr<SpdyFrameProducer> producer; 1047 scoped_ptr<SpdyBufferProducer> producer;
1044 scoped_refptr<SpdyStream> stream; 1048 scoped_refptr<SpdyStream> stream;
1045 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) 1049 if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
1046 break; 1050 break;
1047 1051
1048 // It is possible that a stream had data to write, but a 1052 // It is possible that a stream had data to write, but a
1049 // WINDOW_UPDATE frame has been received which made that 1053 // WINDOW_UPDATE frame has been received which made that
1050 // stream no longer writable. 1054 // stream no longer writable.
1051 // TODO(rch): consider handling that case by removing the 1055 // TODO(rch): consider handling that case by removing the
1052 // stream from the writable queue? 1056 // stream from the writable queue?
1053 if (stream.get() && stream->cancelled()) 1057 if (stream.get() && stream->cancelled())
1054 continue; 1058 continue;
1055 1059
1056 // Activate the stream only when sending the SYN_STREAM frame to 1060 // Activate the stream only when sending the SYN_STREAM frame to
1057 // guarantee monotonically-increasing stream IDs. 1061 // guarantee monotonically-increasing stream IDs.
1058 if (frame_type == SYN_STREAM) { 1062 if (frame_type == SYN_STREAM) {
1059 if (stream.get() && stream->stream_id() == 0) { 1063 if (stream.get() && stream->stream_id() == 0) {
1060 ActivateStream(stream); 1064 ActivateStream(stream);
1061 } else { 1065 } else {
1062 NOTREACHED(); 1066 NOTREACHED();
1063 continue; 1067 continue;
1064 } 1068 }
1065 } 1069 }
1066 1070
1067 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); 1071 in_flight_write_ = producer->ProduceBuffer();
1068 if (!frame) { 1072 if (!in_flight_write_) {
1069 NOTREACHED(); 1073 NOTREACHED();
1070 continue; 1074 continue;
1071 } 1075 }
1072 DCHECK_GT(frame->size(), 0u);
1073
1074 // TODO(mbelshe): We have too much copying of data here.
1075 scoped_refptr<IOBufferWithSize> buffer =
1076 new IOBufferWithSize(frame->size());
1077 memcpy(buffer->data(), frame->data(), frame->size());
1078 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
1079 in_flight_write_frame_type_ = frame_type; 1076 in_flight_write_frame_type_ = frame_type;
1077 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1078 DCHECK_GE(in_flight_write_frame_size_,
1079 buffered_spdy_framer_->GetFrameMinimumSize());
1080 in_flight_write_stream_ = stream;
1080 } 1081 }
1081 1082
1082 write_pending_ = true; 1083 write_pending_ = true;
1083 int rv = connection_->socket()->Write( 1084 int rv = connection_->socket()->Write(
1084 in_flight_write_.buffer(), 1085 in_flight_write_->GetIOBufferForRemainingData(),
1085 in_flight_write_.buffer()->BytesRemaining(), 1086 in_flight_write_->GetRemainingSize(),
1086 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); 1087 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
1087 if (rv == net::ERR_IO_PENDING) 1088 if (rv == net::ERR_IO_PENDING)
1088 break; 1089 break;
1089 1090
1090 // We sent the frame successfully. 1091 // We sent the frame successfully.
1091 OnWriteComplete(rv); 1092 OnWriteComplete(rv);
1092 1093
1093 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 1094 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
1094 // as in an error state. 1095 // as in an error state.
1095 if (rv < 0) 1096 if (rv < 0)
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
1262 1263
1263 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1264 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1264 SpdyFrameType frame_type, 1265 SpdyFrameType frame_type,
1265 scoped_ptr<SpdyFrame> frame) { 1266 scoped_ptr<SpdyFrame> frame) {
1266 DCHECK(frame_type == RST_STREAM || 1267 DCHECK(frame_type == RST_STREAM ||
1267 frame_type == SETTINGS || 1268 frame_type == SETTINGS ||
1268 frame_type == WINDOW_UPDATE || 1269 frame_type == WINDOW_UPDATE ||
1269 frame_type == PING); 1270 frame_type == PING);
1270 EnqueueWrite( 1271 EnqueueWrite(
1271 priority, frame_type, 1272 priority, frame_type,
1272 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), 1273 scoped_ptr<SpdyBufferProducer>(
1274 new SimpleBufferProducer(
1275 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1273 NULL); 1276 NULL);
1274 } 1277 }
1275 1278
1276 void SpdySession::EnqueueWrite(RequestPriority priority, 1279 void SpdySession::EnqueueWrite(RequestPriority priority,
1277 SpdyFrameType frame_type, 1280 SpdyFrameType frame_type,
1278 scoped_ptr<SpdyFrameProducer> producer, 1281 scoped_ptr<SpdyBufferProducer> producer,
1279 const scoped_refptr<SpdyStream>& stream) { 1282 const scoped_refptr<SpdyStream>& stream) {
1280 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1283 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1281 WriteSocketLater(); 1284 WriteSocketLater();
1282 } 1285 }
1283 1286
1284 void SpdySession::ActivateStream(SpdyStream* stream) { 1287 void SpdySession::ActivateStream(SpdyStream* stream) {
1285 if (stream->stream_id() == 0) { 1288 if (stream->stream_id() == 0) {
1286 stream->set_stream_id(GetNewStreamId()); 1289 stream->set_stream_id(GetNewStreamId());
1287 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); 1290 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1288 } 1291 }
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1386 const char* data, 1389 const char* data,
1387 size_t len, 1390 size_t len,
1388 bool fin) { 1391 bool fin) {
1389 DCHECK_LT(len, 1u << 24); 1392 DCHECK_LT(len, 1u << 24);
1390 if (net_log().IsLoggingAllEvents()) { 1393 if (net_log().IsLoggingAllEvents()) {
1391 net_log().AddEvent( 1394 net_log().AddEvent(
1392 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1395 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1393 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1396 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1394 } 1397 }
1395 1398
1399 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1400
1396 // By the time data comes in, the stream may already be inactive. 1401 // By the time data comes in, the stream may already be inactive.
1397 if (!IsStreamActive(stream_id)) 1402 if (it == active_streams_.end())
1398 return; 1403 return;
1399 1404
1400 // Only decrease the window size for data for active streams. 1405 // Only decrease the window size for data for active streams.
1401 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) 1406 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
1402 DecreaseRecvWindowSize(static_cast<int32>(len)); 1407 DecreaseRecvWindowSize(static_cast<int32>(len));
1403 1408
1404 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1409 scoped_ptr<SpdyBuffer> buffer;
1405 stream->OnDataReceived(data, len); 1410 if (data) {
1411 DCHECK_GT(len, 0u);
1412 buffer.reset(new SpdyBuffer(data, len));
1413 } else {
1414 DCHECK_EQ(len, 0u);
1415 }
1416 it->second->OnDataReceived(buffer.Pass());
1406 } 1417 }
1407 1418
1408 void SpdySession::OnSetting(SpdySettingsIds id, 1419 void SpdySession::OnSetting(SpdySettingsIds id,
1409 uint8 flags, 1420 uint8 flags,
1410 uint32 value) { 1421 uint32 value) {
1411 HandleSetting(id, value); 1422 HandleSetting(id, value);
1412 http_server_properties_->SetSpdySetting( 1423 http_server_properties_->SetSpdySetting(
1413 host_port_pair(), 1424 host_port_pair(),
1414 id, 1425 id,
1415 static_cast<SpdySettingsFlags>(flags), 1426 static_cast<SpdySettingsFlags>(flags),
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
1664 if (!IsStreamActive(stream_id)) { 1675 if (!IsStreamActive(stream_id)) {
1665 // NOTE: it may just be that the stream was cancelled. 1676 // NOTE: it may just be that the stream was cancelled.
1666 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 1677 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1667 return; 1678 return;
1668 } 1679 }
1669 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1680 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1670 CHECK_EQ(stream->stream_id(), stream_id); 1681 CHECK_EQ(stream->stream_id(), stream_id);
1671 CHECK(!stream->cancelled()); 1682 CHECK(!stream->cancelled());
1672 1683
1673 if (status == 0) { 1684 if (status == 0) {
1674 stream->OnDataReceived(NULL, 0); 1685 stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
1675 } else if (status == RST_STREAM_REFUSED_STREAM) { 1686 } else if (status == RST_STREAM_REFUSED_STREAM) {
1676 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); 1687 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
1677 } else { 1688 } else {
1678 RecordProtocolErrorHistogram( 1689 RecordProtocolErrorHistogram(
1679 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 1690 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
1680 stream->LogStreamError( 1691 stream->LogStreamError(
1681 ERR_SPDY_PROTOCOL_ERROR, 1692 ERR_SPDY_PROTOCOL_ERROR,
1682 base::StringPrintf("SPDY stream closed with status: %d", status)); 1693 base::StringPrintf("SPDY stream closed with status: %d", status));
1683 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 1694 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1684 // For now, it doesn't matter much - it is a protocol error. 1695 // For now, it doesn't matter much - it is a protocol error.
(...skipping 600 matching lines...) Expand 10 before | Expand all | Expand 10 after
2285 } 2296 }
2286 2297
2287 session_recv_window_size_ -= delta_window_size; 2298 session_recv_window_size_ -= delta_window_size;
2288 net_log_.AddEvent( 2299 net_log_.AddEvent(
2289 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2300 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2290 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2301 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2291 -delta_window_size, session_recv_window_size_)); 2302 -delta_window_size, session_recv_window_size_));
2292 } 2303 }
2293 2304
2294 } // namespace net 2305 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698