Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(190)

Side by Side Diff: trunk/src/net/spdy/spdy_session.cc

Issue 13996009: Revert 194560 "[SPDY] Replace SpdyIOBuffer with new SpdyBuffer c..." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « trunk/src/net/spdy/spdy_session.h ('k') | trunk/src/net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map> 8 #include <map>
9 9
10 #include "base/basictypes.h" 10 #include "base/basictypes.h"
(...skipping 13 matching lines...) Expand all
24 #include "base/utf_string_conversions.h" 24 #include "base/utf_string_conversions.h"
25 #include "base/values.h" 25 #include "base/values.h"
26 #include "crypto/ec_private_key.h" 26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h" 27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h" 28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h" 29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h" 30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h" 31 #include "net/cert/asn1_util.h"
32 #include "net/http/http_network_session.h" 32 #include "net/http/http_network_session.h"
33 #include "net/http/http_server_properties.h" 33 #include "net/http/http_server_properties.h"
34 #include "net/spdy/spdy_buffer_producer.h"
35 #include "net/spdy/spdy_credential_builder.h" 34 #include "net/spdy/spdy_credential_builder.h"
36 #include "net/spdy/spdy_frame_builder.h" 35 #include "net/spdy/spdy_frame_builder.h"
36 #include "net/spdy/spdy_frame_producer.h"
37 #include "net/spdy/spdy_http_utils.h" 37 #include "net/spdy/spdy_http_utils.h"
38 #include "net/spdy/spdy_protocol.h" 38 #include "net/spdy/spdy_protocol.h"
39 #include "net/spdy/spdy_session_pool.h" 39 #include "net/spdy/spdy_session_pool.h"
40 #include "net/spdy/spdy_stream.h" 40 #include "net/spdy/spdy_stream.h"
41 #include "net/ssl/server_bound_cert_service.h" 41 #include "net/ssl/server_bound_cert_service.h"
42 42
43 namespace net { 43 namespace net {
44 44
45 namespace { 45 namespace {
46 46
(...skipping 258 matching lines...) Expand 10 before | Expand all | Expand 10 after
305 NetLog* net_log) 305 NetLog* net_log)
306 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), 306 : ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
307 host_port_proxy_pair_(host_port_proxy_pair), 307 host_port_proxy_pair_(host_port_proxy_pair),
308 spdy_session_pool_(spdy_session_pool), 308 spdy_session_pool_(spdy_session_pool),
309 http_server_properties_(http_server_properties), 309 http_server_properties_(http_server_properties),
310 connection_(new ClientSocketHandle), 310 connection_(new ClientSocketHandle),
311 read_buffer_(new IOBuffer(kReadBufferSize)), 311 read_buffer_(new IOBuffer(kReadBufferSize)),
312 stream_hi_water_mark_(kFirstStreamId), 312 stream_hi_water_mark_(kFirstStreamId),
313 write_pending_(false), 313 write_pending_(false),
314 in_flight_write_frame_type_(DATA), 314 in_flight_write_frame_type_(DATA),
315 in_flight_write_frame_size_(0),
316 delayed_write_pending_(false), 315 delayed_write_pending_(false),
317 is_secure_(false), 316 is_secure_(false),
318 certificate_error_code_(OK), 317 certificate_error_code_(OK),
319 error_(OK), 318 error_(OK),
320 state_(STATE_IDLE), 319 state_(STATE_IDLE),
321 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 320 max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
322 kInitialMaxConcurrentStreams : 321 kInitialMaxConcurrentStreams :
323 initial_max_concurrent_streams), 322 initial_max_concurrent_streams),
324 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 323 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
325 kMaxConcurrentStreamLimit : 324 kMaxConcurrentStreamLimit :
(...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after
630 } 629 }
631 630
632 int SpdySession::GetProtocolVersion() const { 631 int SpdySession::GetProtocolVersion() const {
633 DCHECK(buffered_spdy_framer_.get()); 632 DCHECK(buffered_spdy_framer_.get());
634 return buffered_spdy_framer_->protocol_version(); 633 return buffered_spdy_framer_->protocol_version();
635 } 634 }
636 635
637 void SpdySession::EnqueueStreamWrite( 636 void SpdySession::EnqueueStreamWrite(
638 SpdyStream* stream, 637 SpdyStream* stream,
639 SpdyFrameType frame_type, 638 SpdyFrameType frame_type,
640 scoped_ptr<SpdyBufferProducer> producer) { 639 scoped_ptr<SpdyFrameProducer> producer) {
641 DCHECK(frame_type == HEADERS || 640 DCHECK(frame_type == HEADERS ||
642 frame_type == DATA || 641 frame_type == DATA ||
643 frame_type == CREDENTIAL || 642 frame_type == CREDENTIAL ||
644 frame_type == SYN_STREAM); 643 frame_type == SYN_STREAM);
645 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 644 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
646 } 645 }
647 646
648 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 647 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
649 SpdyStreamId stream_id, 648 SpdyStreamId stream_id,
650 RequestPriority priority, 649 RequestPriority priority,
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
732 bool fin = flags & CONTROL_FLAG_FIN; 731 bool fin = flags & CONTROL_FLAG_FIN;
733 net_log().AddEvent( 732 net_log().AddEvent(
734 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS, 733 NetLog::TYPE_SPDY_SESSION_SEND_HEADERS,
735 base::Bind(&NetLogSpdySynCallback, 734 base::Bind(&NetLogSpdySynCallback,
736 &headers, fin, /*unidirectional=*/false, 735 &headers, fin, /*unidirectional=*/false,
737 stream_id, 0)); 736 stream_id, 0));
738 } 737 }
739 return frame.Pass(); 738 return frame.Pass();
740 } 739 }
741 740
742 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 741 scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
743 net::IOBuffer* data, 742 net::IOBuffer* data,
744 int len, 743 int len,
745 SpdyDataFlags flags) { 744 SpdyDataFlags flags) {
746 // Find our stream. 745 // Find our stream
747 CHECK(IsStreamActive(stream_id)); 746 CHECK(IsStreamActive(stream_id));
748 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 747 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
749 CHECK_EQ(stream->stream_id(), stream_id); 748 CHECK_EQ(stream->stream_id(), stream_id);
750 749
751 if (len < 0) { 750 if (len < 0) {
752 NOTREACHED(); 751 NOTREACHED();
753 return scoped_ptr<SpdyBuffer>(); 752 return scoped_ptr<SpdyFrame>();
754 } 753 }
755 754
756 if (len > kMaxSpdyFrameChunkSize) { 755 if (len > kMaxSpdyFrameChunkSize) {
757 len = kMaxSpdyFrameChunkSize; 756 len = kMaxSpdyFrameChunkSize;
758 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 757 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
759 } 758 }
760 759
761 // Obey send window size of the stream (and session, if applicable) 760 // Obey send window size of the stream (and session, if applicable)
762 // if flow control is enabled. 761 // if flow control is enabled.
763 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 762 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
764 int32 effective_window_size = stream->send_window_size(); 763 int32 effective_window_size = stream->send_window_size();
765 if (effective_window_size <= 0) { 764 if (effective_window_size <= 0) {
766 // Because we queue frames onto the session, it is possible that 765 // Because we queue frames onto the session, it is possible that
767 // a stream was not flow controlled at the time it attempted the 766 // a stream was not flow controlled at the time it attempted the
768 // write, but when we go to fulfill the write, it is now flow 767 // write, but when we go to fulfill the write, it is now flow
769 // controlled. This is why we need the session to mark the stream 768 // controlled. This is why we need the session to mark the stream
770 // as stalled - because only the session knows for sure when the 769 // as stalled - because only the session knows for sure when the
771 // stall occurs. 770 // stall occurs.
772 stream->set_send_stalled_by_flow_control(true); 771 stream->set_send_stalled_by_flow_control(true);
773 net_log().AddEvent( 772 net_log().AddEvent(
774 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, 773 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
775 NetLog::IntegerCallback("stream_id", stream_id)); 774 NetLog::IntegerCallback("stream_id", stream_id));
776 return scoped_ptr<SpdyBuffer>(); 775 return scoped_ptr<SpdyFrame>();
777 } 776 }
778 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 777 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
779 effective_window_size = 778 effective_window_size =
780 std::min(effective_window_size, session_send_window_size_); 779 std::min(effective_window_size, session_send_window_size_);
781 if (effective_window_size <= 0) { 780 if (effective_window_size <= 0) {
782 DCHECK(IsSendStalled()); 781 DCHECK(IsSendStalled());
783 stream->set_send_stalled_by_flow_control(true); 782 stream->set_send_stalled_by_flow_control(true);
784 QueueSendStalledStream(stream); 783 QueueSendStalledStream(stream);
785 net_log().AddEvent( 784 net_log().AddEvent(
786 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, 785 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
787 NetLog::IntegerCallback("stream_id", stream_id)); 786 NetLog::IntegerCallback("stream_id", stream_id));
788 return scoped_ptr<SpdyBuffer>(); 787 return scoped_ptr<SpdyFrame>();
789 } 788 }
790 } 789 }
791 790
792 int new_len = std::min(len, effective_window_size); 791 int new_len = std::min(len, effective_window_size);
793 if (new_len < len) { 792 if (new_len < len) {
794 len = new_len; 793 len = new_len;
795 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 794 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
796 } 795 }
797 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) 796 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
798 DecreaseSendWindowSize(static_cast<int32>(len)); 797 DecreaseSendWindowSize(static_cast<int32>(len));
(...skipping 10 matching lines...) Expand all
809 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 808 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
810 if (len > 0) 809 if (len > 0)
811 SendPrefacePingIfNoneInFlight(); 810 SendPrefacePingIfNoneInFlight();
812 811
813 // TODO(mbelshe): reduce memory copies here. 812 // TODO(mbelshe): reduce memory copies here.
814 DCHECK(buffered_spdy_framer_.get()); 813 DCHECK(buffered_spdy_framer_.get());
815 scoped_ptr<SpdyFrame> frame( 814 scoped_ptr<SpdyFrame> frame(
816 buffered_spdy_framer_->CreateDataFrame( 815 buffered_spdy_framer_->CreateDataFrame(
817 stream_id, data->data(), static_cast<uint32>(len), flags)); 816 stream_id, data->data(), static_cast<uint32>(len), flags));
818 817
819 return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); 818 return frame.Pass();
820 } 819 }
821 820
822 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { 821 void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
823 DCHECK_NE(0u, stream_id); 822 DCHECK_NE(0u, stream_id);
824 // TODO(mbelshe): We should send a RST_STREAM control frame here 823 // TODO(mbelshe): We should send a RST_STREAM control frame here
825 // so that the server can cancel a large send. 824 // so that the server can cancel a large send.
826 825
827 DeleteStream(stream_id, status); 826 DeleteStream(stream_id, status);
828 } 827 }
829 828
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after
976 return OK; 975 return OK;
977 } 976 }
978 977
979 void SpdySession::OnWriteComplete(int result) { 978 void SpdySession::OnWriteComplete(int result) {
980 // Releasing the in-flight write can have a side-effect of dropping 979 // Releasing the in-flight write can have a side-effect of dropping
981 // the last reference to |this|. Hold a reference through this 980 // the last reference to |this|. Hold a reference through this
982 // function. 981 // function.
983 scoped_refptr<SpdySession> self(this); 982 scoped_refptr<SpdySession> self(this);
984 983
985 DCHECK(write_pending_); 984 DCHECK(write_pending_);
986 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 985 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
987 986
988 last_activity_time_ = base::TimeTicks::Now(); 987 last_activity_time_ = base::TimeTicks::Now();
989 write_pending_ = false; 988 write_pending_ = false;
990 989
991 if (result < 0) { 990 if (result < 0) {
992 in_flight_write_.reset(); 991 in_flight_write_.Release();
993 in_flight_write_frame_type_ = DATA; 992 in_flight_write_frame_type_ = DATA;
994 in_flight_write_frame_size_ = 0; 993 CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
995 in_flight_write_stream_ = NULL;
996 CloseSessionOnError(static_cast<Error>(result), true, "Write error");
997 return; 994 return;
998 } 995 }
999 996
1000 // It should not be possible to have written more bytes than our 997 // It should not be possible to have written more bytes than our
1001 // in_flight_write_. 998 // in_flight_write_.
1002 DCHECK_LE(static_cast<size_t>(result), 999 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
1003 in_flight_write_->GetRemainingSize());
1004 1000
1005 if (result > 0) { 1001 in_flight_write_.buffer()->DidConsume(result);
1006 in_flight_write_->Consume(static_cast<size_t>(result));
1007 1002
1008 // We only notify the stream when we've fully written the pending frame. 1003 // We only notify the stream when we've fully written the pending frame.
1009 if (in_flight_write_->GetRemainingSize() == 0) { 1004 if (in_flight_write_.buffer()->BytesRemaining() == 0) {
1010 // It is possible that the stream was cancelled while we were 1005 DCHECK_GT(result, 0);
1011 // writing to the socket.
1012 if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
1013 DCHECK_GT(in_flight_write_frame_size_, 0u);
1014 in_flight_write_stream_->OnFrameWriteComplete(
1015 in_flight_write_frame_type_,
1016 in_flight_write_frame_size_);
1017 }
1018 1006
1019 // Cleanup the write which just completed. 1007 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
1020 in_flight_write_.reset(); 1008
1021 in_flight_write_frame_type_ = DATA; 1009 // It is possible that the stream was cancelled while we were writing
1022 in_flight_write_frame_size_ = 0; 1010 // to the socket.
1023 in_flight_write_stream_ = NULL; 1011 if (stream && !stream->cancelled()) {
1012 DCHECK_GT(in_flight_write_.buffer()->size(), 0);
1013 stream->OnFrameWriteComplete(
1014 in_flight_write_frame_type_,
1015 static_cast<size_t>(in_flight_write_.buffer()->size()));
1024 } 1016 }
1017
1018 // Cleanup the write which just completed.
1019 in_flight_write_.Release();
1020 in_flight_write_frame_type_ = DATA;
1025 } 1021 }
1026 1022
1027 // Write more data. We're already in a continuation, so we can go 1023 // Write more data. We're already in a continuation, so we can go
1028 // ahead and write it immediately (without going back to the message 1024 // ahead and write it immediately (without going back to the message
1029 // loop). 1025 // loop).
1030 WriteSocketLater(); 1026 WriteSocketLater();
1031 } 1027 }
1032 1028
1033 void SpdySession::WriteSocketLater() { 1029 void SpdySession::WriteSocketLater() {
1034 if (delayed_write_pending_) 1030 if (delayed_write_pending_)
(...skipping 19 matching lines...) Expand all
1054 if (!IsConnected()) 1050 if (!IsConnected())
1055 return; 1051 return;
1056 1052
1057 if (write_pending_) // Another write is in progress still. 1053 if (write_pending_) // Another write is in progress still.
1058 return; 1054 return;
1059 1055
1060 // Loop sending frames until we've sent everything or until the write 1056 // Loop sending frames until we've sent everything or until the write
1061 // returns error (or ERR_IO_PENDING). 1057 // returns error (or ERR_IO_PENDING).
1062 DCHECK(buffered_spdy_framer_.get()); 1058 DCHECK(buffered_spdy_framer_.get());
1063 while (true) { 1059 while (true) {
1064 if (in_flight_write_) { 1060 if (in_flight_write_.buffer()) {
1065 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1061 DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
1066 } else { 1062 } else {
1067 // Grab the next frame to send. 1063 // Grab the next frame to send.
1068 SpdyFrameType frame_type = DATA; 1064 SpdyFrameType frame_type = DATA;
1069 scoped_ptr<SpdyBufferProducer> producer; 1065 scoped_ptr<SpdyFrameProducer> producer;
1070 scoped_refptr<SpdyStream> stream; 1066 scoped_refptr<SpdyStream> stream;
1071 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) 1067 if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
1072 break; 1068 break;
1073 1069
1074 // It is possible that a stream had data to write, but a 1070 // It is possible that a stream had data to write, but a
1075 // WINDOW_UPDATE frame has been received which made that 1071 // WINDOW_UPDATE frame has been received which made that
1076 // stream no longer writable. 1072 // stream no longer writable.
1077 // TODO(rch): consider handling that case by removing the 1073 // TODO(rch): consider handling that case by removing the
1078 // stream from the writable queue? 1074 // stream from the writable queue?
1079 if (stream.get() && stream->cancelled()) 1075 if (stream.get() && stream->cancelled())
1080 continue; 1076 continue;
1081 1077
1082 // Activate the stream only when sending the SYN_STREAM frame to 1078 // Activate the stream only when sending the SYN_STREAM frame to
1083 // guarantee monotonically-increasing stream IDs. 1079 // guarantee monotonically-increasing stream IDs.
1084 if (frame_type == SYN_STREAM) { 1080 if (frame_type == SYN_STREAM) {
1085 if (stream.get() && stream->stream_id() == 0) { 1081 if (stream.get() && stream->stream_id() == 0) {
1086 ActivateStream(stream); 1082 ActivateStream(stream);
1087 } else { 1083 } else {
1088 NOTREACHED(); 1084 NOTREACHED();
1089 continue; 1085 continue;
1090 } 1086 }
1091 } 1087 }
1092 1088
1093 in_flight_write_ = producer->ProduceBuffer(); 1089 scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
1094 if (!in_flight_write_) { 1090 if (!frame) {
1095 NOTREACHED(); 1091 NOTREACHED();
1096 continue; 1092 continue;
1097 } 1093 }
1094 DCHECK_GT(frame->size(), 0u);
1095
1096 // TODO(mbelshe): We have too much copying of data here.
1097 scoped_refptr<IOBufferWithSize> buffer =
1098 new IOBufferWithSize(frame->size());
1099 memcpy(buffer->data(), frame->data(), frame->size());
1100 in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
1098 in_flight_write_frame_type_ = frame_type; 1101 in_flight_write_frame_type_ = frame_type;
1099 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1100 DCHECK_GE(in_flight_write_frame_size_,
1101 buffered_spdy_framer_->GetFrameMinimumSize());
1102 in_flight_write_stream_ = stream;
1103 } 1102 }
1104 1103
1105 write_pending_ = true; 1104 write_pending_ = true;
1106 // We keep |in_flight_write_| alive until OnWriteComplete(), so
1107 // it's okay to use GetIOBufferForRemainingData() since the socket
1108 // doesn't use the IOBuffer past OnWriteComplete().
1109 int rv = connection_->socket()->Write( 1105 int rv = connection_->socket()->Write(
1110 in_flight_write_->GetIOBufferForRemainingData(), 1106 in_flight_write_.buffer(),
1111 in_flight_write_->GetRemainingSize(), 1107 in_flight_write_.buffer()->BytesRemaining(),
1112 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); 1108 base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
1113 if (rv == net::ERR_IO_PENDING) 1109 if (rv == net::ERR_IO_PENDING)
1114 break; 1110 break;
1115 1111
1116 // We sent the frame successfully. 1112 // We sent the frame successfully.
1117 OnWriteComplete(rv); 1113 OnWriteComplete(rv);
1118 1114
1119 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 1115 // TODO(mbelshe): Test this error case. Maybe we should mark the socket
1120 // as in an error state. 1116 // as in an error state.
1121 if (rv < 0) 1117 if (rv < 0)
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
1291 1287
1292 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1288 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1293 SpdyFrameType frame_type, 1289 SpdyFrameType frame_type,
1294 scoped_ptr<SpdyFrame> frame) { 1290 scoped_ptr<SpdyFrame> frame) {
1295 DCHECK(frame_type == RST_STREAM || 1291 DCHECK(frame_type == RST_STREAM ||
1296 frame_type == SETTINGS || 1292 frame_type == SETTINGS ||
1297 frame_type == WINDOW_UPDATE || 1293 frame_type == WINDOW_UPDATE ||
1298 frame_type == PING); 1294 frame_type == PING);
1299 EnqueueWrite( 1295 EnqueueWrite(
1300 priority, frame_type, 1296 priority, frame_type,
1301 scoped_ptr<SpdyBufferProducer>( 1297 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
1302 new SimpleBufferProducer(
1303 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1304 NULL); 1298 NULL);
1305 } 1299 }
1306 1300
1307 void SpdySession::EnqueueWrite(RequestPriority priority, 1301 void SpdySession::EnqueueWrite(RequestPriority priority,
1308 SpdyFrameType frame_type, 1302 SpdyFrameType frame_type,
1309 scoped_ptr<SpdyBufferProducer> producer, 1303 scoped_ptr<SpdyFrameProducer> producer,
1310 const scoped_refptr<SpdyStream>& stream) { 1304 const scoped_refptr<SpdyStream>& stream) {
1311 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1305 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1312 WriteSocketLater(); 1306 WriteSocketLater();
1313 } 1307 }
1314 1308
1315 void SpdySession::ActivateStream(SpdyStream* stream) { 1309 void SpdySession::ActivateStream(SpdyStream* stream) {
1316 if (stream->stream_id() == 0) { 1310 if (stream->stream_id() == 0) {
1317 stream->set_stream_id(GetNewStreamId()); 1311 stream->set_stream_id(GetNewStreamId());
1318 created_streams_.erase(scoped_refptr<SpdyStream>(stream)); 1312 created_streams_.erase(scoped_refptr<SpdyStream>(stream));
1319 } 1313 }
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1417 const char* data, 1411 const char* data,
1418 size_t len, 1412 size_t len,
1419 bool fin) { 1413 bool fin) {
1420 DCHECK_LT(len, 1u << 24); 1414 DCHECK_LT(len, 1u << 24);
1421 if (net_log().IsLoggingAllEvents()) { 1415 if (net_log().IsLoggingAllEvents()) {
1422 net_log().AddEvent( 1416 net_log().AddEvent(
1423 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1417 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1424 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1418 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1425 } 1419 }
1426 1420
1427 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1428
1429 // By the time data comes in, the stream may already be inactive. 1421 // By the time data comes in, the stream may already be inactive.
1430 if (it == active_streams_.end()) 1422 if (!IsStreamActive(stream_id))
1431 return; 1423 return;
1432 1424
1433 // Only decrease the window size for data for active streams. 1425 // Only decrease the window size for data for active streams.
1434 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) 1426 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
1435 DecreaseRecvWindowSize(static_cast<int32>(len)); 1427 DecreaseRecvWindowSize(static_cast<int32>(len));
1436 1428
1437 scoped_ptr<SpdyBuffer> buffer; 1429 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1438 if (data) { 1430 stream->OnDataReceived(data, len);
1439 DCHECK_GT(len, 0u);
1440 buffer.reset(new SpdyBuffer(data, len));
1441 } else {
1442 DCHECK_EQ(len, 0u);
1443 }
1444 it->second->OnDataReceived(buffer.Pass());
1445 } 1431 }
1446 1432
1447 void SpdySession::OnSetting(SpdySettingsIds id, 1433 void SpdySession::OnSetting(SpdySettingsIds id,
1448 uint8 flags, 1434 uint8 flags,
1449 uint32 value) { 1435 uint32 value) {
1450 HandleSetting(id, value); 1436 HandleSetting(id, value);
1451 http_server_properties_->SetSpdySetting( 1437 http_server_properties_->SetSpdySetting(
1452 host_port_pair(), 1438 host_port_pair(),
1453 id, 1439 id,
1454 static_cast<SpdySettingsFlags>(flags), 1440 static_cast<SpdySettingsFlags>(flags),
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
1703 if (!IsStreamActive(stream_id)) { 1689 if (!IsStreamActive(stream_id)) {
1704 // NOTE: it may just be that the stream was cancelled. 1690 // NOTE: it may just be that the stream was cancelled.
1705 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 1691 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
1706 return; 1692 return;
1707 } 1693 }
1708 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1694 scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
1709 CHECK_EQ(stream->stream_id(), stream_id); 1695 CHECK_EQ(stream->stream_id(), stream_id);
1710 CHECK(!stream->cancelled()); 1696 CHECK(!stream->cancelled());
1711 1697
1712 if (status == 0) { 1698 if (status == 0) {
1713 stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 1699 stream->OnDataReceived(NULL, 0);
1714 } else if (status == RST_STREAM_REFUSED_STREAM) { 1700 } else if (status == RST_STREAM_REFUSED_STREAM) {
1715 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); 1701 DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
1716 } else { 1702 } else {
1717 RecordProtocolErrorHistogram( 1703 RecordProtocolErrorHistogram(
1718 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 1704 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
1719 stream->LogStreamError( 1705 stream->LogStreamError(
1720 ERR_SPDY_PROTOCOL_ERROR, 1706 ERR_SPDY_PROTOCOL_ERROR,
1721 base::StringPrintf("SPDY stream closed with status: %d", status)); 1707 base::StringPrintf("SPDY stream closed with status: %d", status));
1722 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 1708 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
1723 // For now, it doesn't matter much - it is a protocol error. 1709 // For now, it doesn't matter much - it is a protocol error.
(...skipping 600 matching lines...) Expand 10 before | Expand all | Expand 10 after
2324 } 2310 }
2325 2311
2326 session_recv_window_size_ -= delta_window_size; 2312 session_recv_window_size_ -= delta_window_size;
2327 net_log_.AddEvent( 2313 net_log_.AddEvent(
2328 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2314 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
2329 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2315 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2330 -delta_window_size, session_recv_window_size_)); 2316 -delta_window_size, session_recv_window_size_));
2331 } 2317 }
2332 2318
2333 } // namespace net 2319 } // namespace net
OLDNEW
« no previous file with comments | « trunk/src/net/spdy/spdy_session.h ('k') | trunk/src/net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698