Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(350)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 13990005: [SPDY] Replace SpdyIOBuffer with new SpdyBuffer class (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix missing include Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 13 matching lines...) Expand all
24 #include "base/utf_string_conversions.h" 24 #include "base/utf_string_conversions.h"
25 #include "base/values.h" 25 #include "base/values.h"
26 #include "crypto/ec_private_key.h" 26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h" 27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h" 28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h" 29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h" 30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h" 31 #include "net/cert/asn1_util.h"
32 #include "net/http/http_network_session.h" 32 #include "net/http/http_network_session.h"
33 #include "net/http/http_server_properties.h" 33 #include "net/http/http_server_properties.h"
34 #include "net/spdy/spdy_buffer_producer.h"
34 #include "net/spdy/spdy_credential_builder.h" 35 #include "net/spdy/spdy_credential_builder.h"
35 #include "net/spdy/spdy_frame_builder.h" 36 #include "net/spdy/spdy_frame_builder.h"
36 #include "net/spdy/spdy_frame_producer.h"
37 #include "net/spdy/spdy_http_utils.h" 37 #include "net/spdy/spdy_http_utils.h"
38 #include "net/spdy/spdy_protocol.h" 38 #include "net/spdy/spdy_protocol.h"
39 #include "net/spdy/spdy_session_pool.h" 39 #include "net/spdy/spdy_session_pool.h"
40 #include "net/spdy/spdy_stream.h" 40 #include "net/spdy/spdy_stream.h"
41 #include "net/ssl/server_bound_cert_service.h" 41 #include "net/ssl/server_bound_cert_service.h"
42 42
43 namespace net { 43 namespace net {
44 44
45 namespace { 45 namespace {
46 46
(...skipping 258 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 NetLog* net_log) 305 NetLog* net_log)
306 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), 306 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
307 host_port_proxy_pair_(host_port_proxy_pair), 307 host_port_proxy_pair_(host_port_proxy_pair),
308 spdy_session_pool_(spdy_session_pool), 308 spdy_session_pool_(spdy_session_pool),
309 http_server_properties_(http_server_properties), 309 http_server_properties_(http_server_properties),
310 connection_(new ClientSocketHandle), 310 connection_(new ClientSocketHandle),
311 read_buffer_(new IOBuffer(kReadBufferSize)), 311 read_buffer_(new IOBuffer(kReadBufferSize)),
312 stream_hi_water_mark_(kFirstStreamId), 312 stream_hi_water_mark_(kFirstStreamId),
313 write_pending_(false), 313 write_pending_(false),
314 in_flight_write_frame_type_(DATA), 314 in_flight_write_frame_type_(DATA),
315 in_flight_write_frame_size_(0),
315 delayed_write_pending_(false), 316 delayed_write_pending_(false),
316 is_secure_(false), 317 is_secure_(false),
317 certificate_error_code_(OK), 318 certificate_error_code_(OK),
318 error_(OK), 319 error_(OK),
319 state_(STATE_IDLE), 320 state_(STATE_IDLE),
320 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 321 max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
321 kInitialMaxConcurrentStreams : 322 kInitialMaxConcurrentStreams :
322 initial_max_concurrent_streams), 323 initial_max_concurrent_streams),
323 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 324 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
324 kMaxConcurrentStreamLimit : 325 kMaxConcurrentStreamLimit :
(...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after
629 } 630 }
630 631
631 int SpdySession::GetProtocolVersion() const { 632 int SpdySession::GetProtocolVersion() const {
632 DCHECK(buffered_spdy_framer_.get()); 633 DCHECK(buffered_spdy_framer_.get());
633 return buffered_spdy_framer_->protocol_version(); 634 return buffered_spdy_framer_->protocol_version();
634 } 635 }
635 636
636 void SpdySession::EnqueueStreamWrite( 637 void SpdySession::EnqueueStreamWrite(
637 SpdyStream* stream, 638 SpdyStream* stream,
638 SpdyFrameType frame_type, 639 SpdyFrameType frame_type,
639 scoped_ptr<SpdyFrameProducer> producer) { 640 scoped_ptr<SpdyBufferProducer> producer) {
640 DCHECK(frame_type == HEADERS || 641 DCHECK(frame_type == HEADERS ||
641 frame_type == DATA || 642 frame_type == DATA ||
642 frame_type == CREDENTIAL || 643 frame_type == CREDENTIAL ||
643 frame_type == SYN_STREAM); 644 frame_type == SYN_STREAM);
644 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 645 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
645 } 646 }
646 647
647 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 648 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
648 SpdyStreamId stream_id, 649 SpdyStreamId stream_id,
649 RequestPriority priority, 650 RequestPriority priority,
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
731 bool fin = flags & CONTROL_FLAG_FIN; 732 bool fin = flags & CONTROL_FLAG_FIN;
732 net_log().AddEvent( 733 net_log().AddEvent(
733 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, 734 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS,
734 base::Bind(&NetLogSpdySynCallback, 735 base::Bind(&NetLogSpdySynCallback,
735 &headers, fin, /*unidirectional=*/false, 736 &headers, fin, /*unidirectional=*/false,
736 stream_id, 0)); 737 stream_id, 0));
737 } 738 }
738 return frame.Pass(); 739 return frame.Pass();
739 } 740 }
740 741
741 scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, 742 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
742 net::IOBuffer* data, 743 net::IOBuffer* data,
743 int len, 744 int len,
744 SpdyDataFlags flags) { 745 SpdyDataFlags flags) {
745 // Find our stream 746 // Find our stream.
746 CHECK(IsStreamActive(stream_id)); 747 CHECK(IsStreamActive(stream_id));
747 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 748 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
748 CHECK_EQ(stream->stream_id(), stream_id); 749 CHECK_EQ(stream->stream_id(), stream_id);
749 750
750 if (len < 0) { 751 if (len < 0) {
751 NOTREACHED(); 752 NOTREACHED();
752 return scoped_ptr<SpdyFrame>(); 753 return scoped_ptr<SpdyBuffer>();
753 } 754 }
754 755
755 if (len > kMaxSpdyFrameChunkSize) { 756 if (len > kMaxSpdyFrameChunkSize) {
756 len = kMaxSpdyFrameChunkSize; 757 len = kMaxSpdyFrameChunkSize;
757 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 758 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
758 } 759 }
759 760
760 // Obey send window size of the stream (and session, if applicable) 761 // Obey send window size of the stream (and session, if applicable)
761 // if flow control is enabled. 762 // if flow control is enabled.
762 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 763 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
763 int32 effective_window_size = stream->send_window_size(); 764 int32 effective_window_size = stream->send_window_size();
764 if (effective_window_size <= 0) { 765 if (effective_window_size <= 0) {
765 // Because we queue frames onto the session, it is possible that 766 // Because we queue frames onto the session, it is possible that
766 // a stream was not flow controlled at the time it attempted the 767 // a stream was not flow controlled at the time it attempted the
767 // write, but when we go to fulfill the write, it is now flow 768 // write, but when we go to fulfill the write, it is now flow
768 // controlled. This is why we need the session to mark the stream 769 // controlled. This is why we need the session to mark the stream
769 // as stalled - because only the session knows for sure when the 770 // as stalled - because only the session knows for sure when the
770 // stall occurs. 771 // stall occurs.
771 stream->set_send_stalled_by_flow_control(true); 772 stream->set_send_stalled_by_flow_control(true);
772 net_log().AddEvent( 773 net_log().AddEvent(
773 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, 774 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
774 NetLog::IntegerCallback("stream_id", stream_id)); 775 NetLog::IntegerCallback("stream_id", stream_id));
775 return scoped_ptr<SpdyFrame>(); 776 return scoped_ptr<SpdyBuffer>();
776 } 777 }
777 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 778 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
778 effective_window_size = 779 effective_window_size =
779 std::min(effective_window_size, session_send_window_size_); 780 std::min(effective_window_size, session_send_window_size_);
780 if (effective_window_size <= 0) { 781 if (effective_window_size <= 0) {
781 DCHECK(IsSendStalled()); 782 DCHECK(IsSendStalled());
782 stream->set_send_stalled_by_flow_control(true); 783 stream->set_send_stalled_by_flow_control(true);
783 QueueSendStalledStream(stream); 784 QueueSendStalledStream(stream);
784 net_log().AddEvent( 785 net_log().AddEvent(
785 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, 786 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
786 NetLog::IntegerCallback("stream_id", stream_id)); 787 NetLog::IntegerCallback("stream_id", stream_id));
787 return scoped_ptr<SpdyFrame>(); 788 return scoped_ptr<SpdyBuffer>();
788 } 789 }
789 } 790 }
790 791
791 int new_len = std::min(len, effective_window_size); 792 int new_len = std::min(len, effective_window_size);
792 if (new_len < len) { 793 if (new_len < len) {
793 len = new_len; 794 len = new_len;
794 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 795 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
795 } 796 }
796 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) 797 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
797 DecreaseSendWindowSize(static_cast<int32>(len)); 798 DecreaseSendWindowSize(static_cast<int32>(len));
(...skipping 10 matching lines...) Expand all
808 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 809 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
809 if (len > 0) 810 if (len > 0)
810 SendPrefacePingIfNoneInFlight(); 811 SendPrefacePingIfNoneInFlight();
811 812
812 // TODO(mbelshe): reduce memory copies here. 813 // TODO(mbelshe): reduce memory copies here.
813 DCHECK(buffered_spdy_framer_.get()); 814 DCHECK(buffered_spdy_framer_.get());
814 scoped_ptr<SpdyFrame> frame( 815 scoped_ptr<SpdyFrame> frame(
815 buffered_spdy_framer_->CreateDataFrame( 816 buffered_spdy_framer_->CreateDataFrame(
816 stream_id, data->data(), static_cast<uint32>(len), flags)); 817 stream_id, data->data(), static_cast<uint32>(len), flags));
817 818
818 return frame.Pass(); 819 return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
819 } 820 }
820 821
821 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { 822 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
822 DCHECK_NE(0u, stream_id); 823 DCHECK_NE(0u, stream_id);
823 // TODO(mbelshe): We should send a RST_STREAM control frame here 824 // TODO(mbelshe): We should send a RST_STREAM control frame here
824 // so that the server can cancel a large send. 825 // so that the server can cancel a large send.
825 826
826 DeleteStream(stream_id, status); 827 DeleteStream(stream_id, status);
827 } 828 }
828 829
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
975 return OK; 976 return OK;
976 } 977 }
977 978
978 void SpdySession::OnWriteComplete(int result) { 979 void SpdySession::OnWriteComplete(int result) {
979 // Releasing the in-flight write can have a side-effect of dropping 980 // Releasing the in-flight write can have a side-effect of dropping
980 // the last reference to |this|. Hold a reference through this 981 // the last reference to |this|. Hold a reference through this
981 // function. 982 // function.
982 scoped_refptr<SpdySession> self(this); 983 scoped_refptr<SpdySession> self(this);
983 984
984 DCHECK(write_pending_); 985 DCHECK(write_pending_);
985 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); 986 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
986 987
987 last_activity_time_ = base::TimeTicks::Now(); 988 last_activity_time_ = base::TimeTicks::Now();
988 write_pending_ = false; 989 write_pending_ = false;
989 990
990 if (result < 0) { 991 if (result < 0) {
991 in_flight_write_.Release(); 992 in_flight_write_.reset();
992 in_flight_write_frame_type_ = DATA; 993 in_flight_write_frame_type_ = DATA;
993 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); 994 in_flight_write_frame_size_ = 0;
995 in_flight_write_stream_ = NULL;
996 CloseSessionOnError(static_cast<Error>(result), true, "Write error");
994 return; 997 return;
995 } 998 }
996 999
997 // It should not be possible to have written more bytes than our 1000 // It should not be possible to have written more bytes than our
998 // in_flight_write_. 1001 // in_flight_write_.
999 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); 1002 DCHECK_LE(static_cast<size_t>(result),
1003 in_flight_write_->GetRemainingSize());
1000 1004
1001 in_flight_write_.buffer()->DidConsume(result); 1005 if (result > 0) {
1006 in_flight_write_->Consume(static_cast<size_t>(result));
1002 1007
1003 // We only notify the stream when we've fully written the pending frame. 1008 // We only notify the stream when we've fully written the pending frame.
1004 if (in_flight_write_.buffer()->BytesRemaining() == 0) { 1009 if (in_flight_write_->GetRemainingSize() == 0) {
1005 DCHECK_GT(result, 0); 1010 // It is possible that the stream was cancelled while we were
1011 // writing to the socket.
1012 if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
1013 DCHECK_GT(in_flight_write_frame_size_, 0u);
1014 in_flight_write_stream_->OnFrameWriteComplete(
1015 in_flight_write_frame_type_,
1016 in_flight_write_frame_size_);
1017 }
1006 1018
1007 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 1019 // Cleanup the write which just completed.
1008 1020 in_flight_write_.reset();
1009 // It is possible that the stream was cancelled while we were writing 1021 in_flight_write_frame_type_ = DATA;
1010 // to the socket. 1022 in_flight_write_frame_size_ = 0;
1011 if (stream && !stream->cancelled()) { 1023 in_flight_write_stream_ = NULL;
1012 DCHECK_GT(in_flight_write_.buffer()->size(), 0);
1013 stream->OnFrameWriteComplete(
1014 in_flight_write_frame_type_,
1015 static_cast<size_t>(in_flight_write_.buffer()->size()));
1016 } 1024 }
1017
1018 // Cleanup the write which just completed.
1019 in_flight_write_.Release();
1020 in_flight_write_frame_type_ = DATA;
1021 } 1025 }
1022 1026
1023 // Write more data. We're already in a continuation, so we can go 1027 // Write more data. We're already in a continuation, so we can go
1024 // ahead and write it immediately (without going back to the message 1028 // ahead and write it immediately (without going back to the message
1025 // loop). 1029 // loop).
1026 WriteSocketLater(); 1030 WriteSocketLater();
1027 } 1031 }
1028 1032
1029 void SpdySession::WriteSocketLater() { 1033 void SpdySession::WriteSocketLater() {
1030 if (delayed_write_pending_) 1034 if (delayed_write_pending_)
(...skipping 19 matching lines...) Expand all
1050 if (!IsConnected()) 1054 if (!IsConnected())
1051 return; 1055 return;
1052 1056
1053 if (write_pending_) // Another write is in progress still. 1057 if (write_pending_) // Another write is in progress still.
1054 return; 1058 return;
1055 1059
1056 // Loop sending frames until we've sent everything or until the write 1060 // Loop sending frames until we've sent everything or until the write
1057 // returns error (or ERR_IO_PENDING). 1061 // returns error (or ERR_IO_PENDING).
1058 DCHECK(buffered_spdy_framer_.get()); 1062 DCHECK(buffered_spdy_framer_.get());
1059 while (true) { 1063 while (true) {
1060 if (in_flight_write_.buffer()) { 1064 if (in_flight_write_) {
1061 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); 1065 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1062 } else { 1066 } else {
1063 // Grab the next frame to send. 1067 // Grab the next frame to send.
1064 SpdyFrameType frame_type = DATA; 1068 SpdyFrameType frame_type = DATA;
1065 scoped_ptr<SpdyFrameProducer> producer; 1069 scoped_ptr<SpdyBufferProducer> producer;
1066 scoped_refptr<SpdyStream> stream; 1070 scoped_refptr<SpdyStream> stream;
1067 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) 1071 if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
1068 break; 1072 break;
1069 1073
1070 // It is possible that a stream had data to write, but a 1074 // It is possible that a stream had data to write, but a
1071 // WINDOW_UPDATE frame has been received which made that 1075 // WINDOW_UPDATE frame has been received which made that
1072 // stream no longer writable. 1076 // stream no longer writable.
1073 // TODO(rch): consider handling that case by removing the 1077 // TODO(rch): consider handling that case by removing the
1074 // stream from the writable queue? 1078 // stream from the writable queue?
1075 if (stream.get() && stream->cancelled()) 1079 if (stream.get() && stream->cancelled())
1076 continue; 1080 continue;
1077 1081
1078 // Activate the stream only when sending the SYN_STREAM frame to 1082 // Activate the stream only when sending the SYN_STREAM frame to
1079 // guarantee monotonically-increasing stream IDs. 1083 // guarantee monotonically-increasing stream IDs.
1080 if (frame_type == SYN_STREAM) { 1084 if (frame_type == SYN_STREAM) {
1081 if (stream.get() && stream->stream_id() == 0) { 1085 if (stream.get() && stream->stream_id() == 0) {
1082 ActivateStream(stream); 1086 ActivateStream(stream);
1083 } else { 1087 } else {
1084 NOTREACHED(); 1088 NOTREACHED();
1085 continue; 1089 continue;
1086 } 1090 }
1087 } 1091 }
1088 1092
1089 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); 1093 in_flight_write_ = producer->ProduceBuffer();
1090 if (!frame) { 1094 if (!in_flight_write_) {
1091 NOTREACHED(); 1095 NOTREACHED();
1092 continue; 1096 continue;
1093 } 1097 }
1094 DCHECK_GT(frame->size(), 0u);
1095
1096 // TODO(mbelshe): We have too much copying of data here.
1097 scoped_refptr<IOBufferWithSize> buffer =
1098 new IOBufferWithSize(frame->size());
1099 memcpy(buffer->data(), frame->data(), frame->size());
1100 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
1101 in_flight_write_frame_type_ = frame_type; 1098 in_flight_write_frame_type_ = frame_type;
1099 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1100 DCHECK_GE(in_flight_write_frame_size_,
1101 buffered_spdy_framer_->GetFrameMinimumSize());
1102 in_flight_write_stream_ = stream;
1102 } 1103 }
1103 1104
1104 write_pending_ = true; 1105 write_pending_ = true;
1106 // We keep |in_flight_write_| alive until OnWriteComplete(), so
1107 // it's okay to use GetIOBufferForRemainingData() since the socket
1108 // doesn't use the IOBuffer past OnWriteComplete().
1105 int rv = connection_->socket()->Write( 1109 int rv = connection_->socket()->Write(
1106 in_flight_write_.buffer(), 1110 in_flight_write_->GetIOBufferForRemainingData(),
1107 in_flight_write_.buffer()->BytesRemaining(), 1111 in_flight_write_->GetRemainingSize(),
1108 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); 1112 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
1109 if (rv == net::ERR_IO_PENDING) 1113 if (rv == net::ERR_IO_PENDING)
1110 break; 1114 break;
1111 1115
1112 // We sent the frame successfully. 1116 // We sent the frame successfully.
1113 OnWriteComplete(rv); 1117 OnWriteComplete(rv);
1114 1118
1115 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 1119 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
1116 // as in an error state. 1120 // as in an error state.
1117 if (rv < 0) 1121 if (rv < 0)
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
1287 1291
1288 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1292 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1289 SpdyFrameType frame_type, 1293 SpdyFrameType frame_type,
1290 scoped_ptr<SpdyFrame> frame) { 1294 scoped_ptr<SpdyFrame> frame) {
1291 DCHECK(frame_type == RST_STREAM || 1295 DCHECK(frame_type == RST_STREAM ||
1292 frame_type == SETTINGS || 1296 frame_type == SETTINGS ||
1293 frame_type == WINDOW_UPDATE || 1297 frame_type == WINDOW_UPDATE ||
1294 frame_type == PING); 1298 frame_type == PING);
1295 EnqueueWrite( 1299 EnqueueWrite(
1296 priority, frame_type, 1300 priority, frame_type,
1297 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), 1301 scoped_ptr<SpdyBufferProducer>(
1302 new SimpleBufferProducer(
1303 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1298 NULL); 1304 NULL);
1299 } 1305 }
1300 1306
1301 void SpdySession::EnqueueWrite(RequestPriority priority, 1307 void SpdySession::EnqueueWrite(RequestPriority priority,
1302 SpdyFrameType frame_type, 1308 SpdyFrameType frame_type,
1303 scoped_ptr<SpdyFrameProducer> producer, 1309 scoped_ptr<SpdyBufferProducer> producer,
1304 const scoped_refptr<SpdyStream>& stream) { 1310 const scoped_refptr<SpdyStream>& stream) {
1305 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1311 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1306 WriteSocketLater(); 1312 WriteSocketLater();
1307 } 1313 }
1308 1314
1309 void SpdySession::ActivateStream(SpdyStream* stream) { 1315 void SpdySession::ActivateStream(SpdyStream* stream) {
1310 if (stream->stream_id() == 0) { 1316 if (stream->stream_id() == 0) {
1311 stream->set_stream_id(GetNewStreamId()); 1317 stream->set_stream_id(GetNewStreamId());
1312 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); 1318 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1313 } 1319 }
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1411 const char* data, 1417 const char* data,
1412 size_t len, 1418 size_t len,
1413 bool fin) { 1419 bool fin) {
1414 DCHECK_LT(len, 1u << 24); 1420 DCHECK_LT(len, 1u << 24);
1415 if (net_log().IsLoggingAllEvents()) { 1421 if (net_log().IsLoggingAllEvents()) {
1416 net_log().AddEvent( 1422 net_log().AddEvent(
1417 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1423 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1418 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1424 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1419 } 1425 }
1420 1426
1427 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1428
1421 // By the time data comes in, the stream may already be inactive. 1429 // By the time data comes in, the stream may already be inactive.
1422 if (!IsStreamActive(stream_id)) 1430 if (it == active_streams_.end())
1423 return; 1431 return;
1424 1432
1425 // Only decrease the window size for data for active streams. 1433 // Only decrease the window size for data for active streams.
1426 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) 1434 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
1427 DecreaseRecvWindowSize(static_cast<int32>(len)); 1435 DecreaseRecvWindowSize(static_cast<int32>(len));
1428 1436
1429 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1437 scoped_ptr<SpdyBuffer> buffer;
1430 stream->OnDataReceived(data, len); 1438 if (data) {
1439 DCHECK_GT(len, 0u);
1440 buffer.reset(new SpdyBuffer(data, len));
1441 } else {
1442 DCHECK_EQ(len, 0u);
1443 }
1444 it->second->OnDataReceived(buffer.Pass());
1431 } 1445 }
1432 1446
1433 void SpdySession::OnSetting(SpdySettingsIds id, 1447 void SpdySession::OnSetting(SpdySettingsIds id,
1434 uint8 flags, 1448 uint8 flags,
1435 uint32 value) { 1449 uint32 value) {
1436 HandleSetting(id, value); 1450 HandleSetting(id, value);
1437 http_server_properties_->SetSpdySetting( 1451 http_server_properties_->SetSpdySetting(
1438 host_port_pair(), 1452 host_port_pair(),
1439 id, 1453 id,
1440 static_cast<SpdySettingsFlags>(flags), 1454 static_cast<SpdySettingsFlags>(flags),
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
1689 if (!IsStreamActive(stream_id)) { 1703 if (!IsStreamActive(stream_id)) {
1690 // NOTE: it may just be that the stream was cancelled. 1704 // NOTE: it may just be that the stream was cancelled.
1691 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 1705 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1692 return; 1706 return;
1693 } 1707 }
1694 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1708 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1695 CHECK_EQ(stream->stream_id(), stream_id); 1709 CHECK_EQ(stream->stream_id(), stream_id);
1696 CHECK(!stream->cancelled()); 1710 CHECK(!stream->cancelled());
1697 1711
1698 if (status == 0) { 1712 if (status == 0) {
1699 stream->OnDataReceived(NULL, 0); 1713 stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
1700 } else if (status == RST_STREAM_REFUSED_STREAM) { 1714 } else if (status == RST_STREAM_REFUSED_STREAM) {
1701 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); 1715 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
1702 } else { 1716 } else {
1703 RecordProtocolErrorHistogram( 1717 RecordProtocolErrorHistogram(
1704 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 1718 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
1705 stream->LogStreamError( 1719 stream->LogStreamError(
1706 ERR_SPDY_PROTOCOL_ERROR, 1720 ERR_SPDY_PROTOCOL_ERROR,
1707 base::StringPrintf("SPDY stream closed with status: %d", status)); 1721 base::StringPrintf("SPDY stream closed with status: %d", status));
1708 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 1722 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1709 // For now, it doesn't matter much - it is a protocol error. 1723 // For now, it doesn't matter much - it is a protocol error.
(...skipping 600 matching lines...) Expand 10 before | Expand all | Expand 10 after
2310 } 2324 }
2311 2325
2312 session_recv_window_size_ -= delta_window_size; 2326 session_recv_window_size_ -= delta_window_size;
2313 net_log_.AddEvent( 2327 net_log_.AddEvent(
2314 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2328 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2315 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2329 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2316 -delta_window_size, session_recv_window_size_)); 2330 -delta_window_size, session_recv_window_size_));
2317 } 2331 }
2318 2332
2319 } // namespace net 2333 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698