Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(307)

Side by Side Diff: net/spdy/spdy_stream.cc

Issue 13990005: [SPDY] Replace SpdyIOBuffer with new SpdyBuffer class (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix missing include Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_stream.h" 5 #include "net/spdy/spdy_stream.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/message_loop.h" 12 #include "base/message_loop.h"
13 #include "base/stringprintf.h" 13 #include "base/stringprintf.h"
14 #include "base/values.h" 14 #include "base/values.h"
15 #include "net/spdy/spdy_frame_producer.h" 15 #include "net/spdy/spdy_buffer_producer.h"
16 #include "net/spdy/spdy_http_utils.h" 16 #include "net/spdy/spdy_http_utils.h"
17 #include "net/spdy/spdy_session.h" 17 #include "net/spdy/spdy_session.h"
18 18
19 namespace net { 19 namespace net {
20 20
21 namespace { 21 namespace {
22 22
23 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 23 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
24 int status, 24 int status,
25 const std::string* description, 25 const std::string* description,
(...skipping 21 matching lines...) Expand all
47 if (*i >= 'A' && *i <= 'Z') { 47 if (*i >= 'A' && *i <= 'Z') {
48 return true; 48 return true;
49 } 49 }
50 } 50 }
51 return false; 51 return false;
52 } 52 }
53 53
54 } // namespace 54 } // namespace
55 55
56 // A wrapper around a stream that calls into ProduceSynStreamFrame(). 56 // A wrapper around a stream that calls into ProduceSynStreamFrame().
57 class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { 57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58 public: 58 public:
59 SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream) 59 SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60 : stream_(stream) { 60 : stream_(stream) {
61 DCHECK(stream_); 61 DCHECK(stream_);
62 } 62 }
63 63
64 virtual ~SynStreamFrameProducer() {} 64 virtual ~SynStreamBufferProducer() {}
65 65
66 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { 66 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67 if (!stream_) { 67 if (!stream_) {
68 NOTREACHED(); 68 NOTREACHED();
69 return scoped_ptr<SpdyFrame>(); 69 return scoped_ptr<SpdyBuffer>();
70 } 70 }
71 DCHECK_GT(stream_->stream_id(), 0u); 71 DCHECK_GT(stream_->stream_id(), 0u);
72 return stream_->ProduceSynStreamFrame(); 72 return scoped_ptr<SpdyBuffer>(
73 new SpdyBuffer(stream_->ProduceSynStreamFrame()));
73 } 74 }
74 75
75 private: 76 private:
76 const base::WeakPtr<SpdyStream> stream_; 77 const base::WeakPtr<SpdyStream> stream_;
77 }; 78 };
78 79
79 // A wrapper around a stream that calls into ProduceHeaderFrame() with 80 // A wrapper around a stream that calls into ProduceHeaderFrame() with
80 // a given header block. 81 // a given header block.
81 class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { 82 class SpdyStream::HeaderBufferProducer : public SpdyBufferProducer {
82 public: 83 public:
83 HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream, 84 HeaderBufferProducer(const base::WeakPtr<SpdyStream>& stream,
84 scoped_ptr<SpdyHeaderBlock> headers) 85 scoped_ptr<SpdyHeaderBlock> headers)
85 : stream_(stream), 86 : stream_(stream),
86 headers_(headers.Pass()) { 87 headers_(headers.Pass()) {
87 DCHECK(stream_); 88 DCHECK(stream_);
88 DCHECK(headers_); 89 DCHECK(headers_);
89 } 90 }
90 91
91 virtual ~HeaderFrameProducer() {} 92 virtual ~HeaderBufferProducer() {}
92 93
93 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { 94 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
94 if (!stream_) { 95 if (!stream_) {
95 NOTREACHED(); 96 NOTREACHED();
96 return scoped_ptr<SpdyFrame>(); 97 return scoped_ptr<SpdyBuffer>();
97 } 98 }
98 DCHECK_GT(stream_->stream_id(), 0u); 99 DCHECK_GT(stream_->stream_id(), 0u);
99 return stream_->ProduceHeaderFrame(headers_.Pass()); 100 return scoped_ptr<SpdyBuffer>(
101 new SpdyBuffer(stream_->ProduceHeaderFrame(headers_.Pass())));
100 } 102 }
101 103
102 private: 104 private:
103 const base::WeakPtr<SpdyStream> stream_; 105 const base::WeakPtr<SpdyStream> stream_;
104 scoped_ptr<SpdyHeaderBlock> headers_; 106 scoped_ptr<SpdyHeaderBlock> headers_;
105 }; 107 };
106 108
107 SpdyStream::SpdyStream(SpdySession* session, 109 SpdyStream::SpdyStream(SpdySession* session,
108 const std::string& path, 110 const std::string& path,
109 RequestPriority priority, 111 RequestPriority priority,
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
168 // HEADERS frame. Since we don't have headers, we had better not have 170 // HEADERS frame. Since we don't have headers, we had better not have
169 // any pending data frames. 171 // any pending data frames.
170 if (pending_buffers_.size() != 0U) { 172 if (pending_buffers_.size() != 0U) {
171 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 173 LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
172 "HEADERS incomplete headers, but pending data frames."); 174 "HEADERS incomplete headers, but pending data frames.");
173 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 175 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
174 } 176 }
175 return; 177 return;
176 } 178 }
177 179
178 std::vector<scoped_refptr<IOBufferWithSize> > buffers; 180 std::vector<SpdyBuffer*> buffers;
179 buffers.swap(pending_buffers_); 181 pending_buffers_.release(&buffers);
180 for (size_t i = 0; i < buffers.size(); ++i) { 182 for (size_t i = 0; i < buffers.size(); ++i) {
181 // It is always possible that a callback to the delegate results in 183 // It is always possible that a callback to the delegate results in
182 // the delegate no longer being available. 184 // the delegate no longer being available.
183 if (!delegate_) 185 if (!delegate_)
184 break; 186 break;
185 if (buffers[i]) { 187 if (buffers[i]) {
186 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); 188 delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i]));
187 } else { 189 } else {
188 delegate_->OnDataReceived(NULL, 0); 190 delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>());
189 session_->CloseStream(stream_id_, net::OK); 191 session_->CloseStream(stream_id_, net::OK);
190 // Note: |this| may be deleted after calling CloseStream. 192 // Note: |this| may be deleted after calling CloseStream.
191 DCHECK_EQ(buffers.size() - 1, i); 193 DCHECK_EQ(buffers.size() - 1, i);
192 } 194 }
193 } 195 }
194 } 196 }
195 197
196 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { 198 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
197 CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE); 199 CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE);
198 CHECK(request_.get()); 200 CHECK(request_.get());
(...skipping 244 matching lines...) Expand 10 before | Expand all | Expand 10 after
443 if (delegate_) { 445 if (delegate_) {
444 rv = delegate_->OnResponseReceived(*response_, response_time_, rv); 446 rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
445 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more 447 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
446 // headers before the response header block is complete. 448 // headers before the response header block is complete.
447 if (rv == ERR_INCOMPLETE_SPDY_HEADERS) 449 if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
448 rv = OK; 450 rv = OK;
449 } 451 }
450 return rv; 452 return rv;
451 } 453 }
452 454
453 void SpdyStream::OnDataReceived(const char* data, size_t length) { 455 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
454 DCHECK(session_->IsStreamActive(stream_id_)); 456 DCHECK(session_->IsStreamActive(stream_id_));
455 DCHECK_LT(length, 1u << 24);
456 // If we don't have a response, then the SYN_REPLY did not come through. 457 // If we don't have a response, then the SYN_REPLY did not come through.
457 // We cannot pass data up to the caller unless the reply headers have been 458 // We cannot pass data up to the caller unless the reply headers have been
458 // received. 459 // received.
459 if (!response_received()) { 460 if (!response_received()) {
460 LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response."); 461 LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response.");
461 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); 462 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
462 return; 463 return;
463 } 464 }
464 465
465 if (!delegate_ || continue_buffering_data_) { 466 if (!delegate_ || continue_buffering_data_) {
466 // It should be valid for this to happen in the server push case. 467 // It should be valid for this to happen in the server push case.
467 // We'll return received data when delegate gets attached to the stream. 468 // We'll return received data when delegate gets attached to the stream.
468 if (length > 0) { 469 if (buffer) {
469 IOBufferWithSize* buf = new IOBufferWithSize(length); 470 pending_buffers_.push_back(buffer.release());
470 memcpy(buf->data(), data, length);
471 pending_buffers_.push_back(make_scoped_refptr(buf));
472 } else { 471 } else {
473 pending_buffers_.push_back(NULL); 472 pending_buffers_.push_back(NULL);
474 metrics_.StopStream(); 473 metrics_.StopStream();
475 // Note: we leave the stream open in the session until the stream 474 // Note: we leave the stream open in the session until the stream
476 // is claimed. 475 // is claimed.
477 } 476 }
478 return; 477 return;
479 } 478 }
480 479
481 CHECK(!closed()); 480 CHECK(!closed());
482 481
483 // A zero-length read means that the stream is being closed. 482 if (!buffer) {
484 if (length == 0) {
485 metrics_.StopStream(); 483 metrics_.StopStream();
486 session_->CloseStream(stream_id_, net::OK); 484 session_->CloseStream(stream_id_, net::OK);
487 // Note: |this| may be deleted after calling CloseStream. 485 // Note: |this| may be deleted after calling CloseStream.
488 return; 486 return;
489 } 487 }
490 488
489 size_t length = buffer->GetRemainingSize();
490 DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
491 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) 491 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM)
492 DecreaseRecvWindowSize(static_cast<int32>(length)); 492 DecreaseRecvWindowSize(static_cast<int32>(length));
493 493
494 // Track our bandwidth. 494 // Track our bandwidth.
495 metrics_.RecordBytes(length); 495 metrics_.RecordBytes(length);
496 recv_bytes_ += length; 496 recv_bytes_ += length;
497 recv_last_byte_time_ = base::TimeTicks::Now(); 497 recv_last_byte_time_ = base::TimeTicks::Now();
498 498
499 if (delegate_->OnDataReceived(data, length) != net::OK) { 499 if (delegate_->OnDataReceived(buffer.Pass()) != net::OK) {
500 // |delegate_| rejected the data. 500 // |delegate_| rejected the data.
501 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data"); 501 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data");
502 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 502 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
503 return; 503 return;
504 } 504 }
505 } 505 }
506 506
507 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, 507 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
508 size_t frame_size) { 508 size_t frame_size) {
509 if (frame_size < session_->GetFrameMinimumSize() || 509 if (frame_size < session_->GetFrameMinimumSize() ||
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
573 } 573 }
574 574
575 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { 575 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
576 // Until the first headers by SYN_STREAM have been completely sent, we can 576 // Until the first headers by SYN_STREAM have been completely sent, we can
577 // not be sure that our stream_id is correct. 577 // not be sure that our stream_id is correct.
578 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 578 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
579 CHECK_GT(stream_id_, 0u); 579 CHECK_GT(stream_id_, 0u);
580 580
581 session_->EnqueueStreamWrite( 581 session_->EnqueueStreamWrite(
582 this, HEADERS, 582 this, HEADERS,
583 scoped_ptr<SpdyFrameProducer>( 583 scoped_ptr<SpdyBufferProducer>(
584 new HeaderFrameProducer( 584 new HeaderBufferProducer(
585 weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); 585 weak_ptr_factory_.GetWeakPtr(), headers.Pass())));
586 } 586 }
587 587
588 void SpdyStream::QueueStreamData(IOBuffer* data, 588 void SpdyStream::QueueStreamData(IOBuffer* data,
589 int length, 589 int length,
590 SpdyDataFlags flags) { 590 SpdyDataFlags flags) {
591 // Until the headers have been completely sent, we can not be sure 591 // Until the headers have been completely sent, we can not be sure
592 // that our stream_id is correct. 592 // that our stream_id is correct.
593 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 593 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
594 CHECK_GT(stream_id_, 0u); 594 CHECK_GT(stream_id_, 0u);
595 CHECK(!cancelled()); 595 CHECK(!cancelled());
596 596
597 scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( 597 scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer(
598 stream_id_, data, length, flags)); 598 stream_id_, data, length, flags));
599 if (!data_frame) 599 // We'll get called again by PossiblyResumeIfSendStalled().
600 if (!data_buffer)
600 return; 601 return;
601 602
602 session_->EnqueueStreamWrite( 603 session_->EnqueueStreamWrite(
603 this, DATA, 604 this, DATA,
604 scoped_ptr<SpdyFrameProducer>( 605 scoped_ptr<SpdyBufferProducer>(
605 new SimpleFrameProducer(data_frame.Pass()))); 606 new SimpleBufferProducer(data_buffer.Pass())));
606 } 607 }
607 608
608 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 609 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
609 bool* was_npn_negotiated, 610 bool* was_npn_negotiated,
610 NextProto* protocol_negotiated) { 611 NextProto* protocol_negotiated) {
611 return session_->GetSSLInfo( 612 return session_->GetSSLInfo(
612 ssl_info, was_npn_negotiated, protocol_negotiated); 613 ssl_info, was_npn_negotiated, protocol_negotiated);
613 } 614 }
614 615
615 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 616 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after
777 DCHECK(frame); 778 DCHECK(frame);
778 // TODO(akalin): Fix the following race condition: 779 // TODO(akalin): Fix the following race condition:
779 // 780 //
780 // Since this is decoupled from sending the SYN_STREAM frame, it is 781 // Since this is decoupled from sending the SYN_STREAM frame, it is
781 // possible that other domain-bound cert frames will clobber ours 782 // possible that other domain-bound cert frames will clobber ours
782 // before our SYN_STREAM frame gets sent. This can be solved by 783 // before our SYN_STREAM frame gets sent. This can be solved by
783 // immediately enqueueing the SYN_STREAM frame here and adjusting 784 // immediately enqueueing the SYN_STREAM frame here and adjusting
784 // the state machine appropriately. 785 // the state machine appropriately.
785 session_->EnqueueStreamWrite( 786 session_->EnqueueStreamWrite(
786 this, CREDENTIAL, 787 this, CREDENTIAL,
787 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass()))); 788 scoped_ptr<SpdyBufferProducer>(
789 new SimpleBufferProducer(
790 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
788 return ERR_IO_PENDING; 791 return ERR_IO_PENDING;
789 } 792 }
790 793
791 int SpdyStream::DoSendDomainBoundCertComplete() { 794 int SpdyStream::DoSendDomainBoundCertComplete() {
792 DCHECK_EQ(just_completed_frame_type_, CREDENTIAL); 795 DCHECK_EQ(just_completed_frame_type_, CREDENTIAL);
793 io_state_ = STATE_SEND_HEADERS; 796 io_state_ = STATE_SEND_HEADERS;
794 return OK; 797 return OK;
795 } 798 }
796 799
797 int SpdyStream::DoSendHeaders() { 800 int SpdyStream::DoSendHeaders() {
798 CHECK(!cancelled_); 801 CHECK(!cancelled_);
799 io_state_ = STATE_SEND_HEADERS_COMPLETE; 802 io_state_ = STATE_SEND_HEADERS_COMPLETE;
800 803
801 session_->EnqueueStreamWrite( 804 session_->EnqueueStreamWrite(
802 this, SYN_STREAM, 805 this, SYN_STREAM,
803 scoped_ptr<SpdyFrameProducer>( 806 scoped_ptr<SpdyBufferProducer>(
804 new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr()))); 807 new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr())));
805 return ERR_IO_PENDING; 808 return ERR_IO_PENDING;
806 } 809 }
807 810
808 int SpdyStream::DoSendHeadersComplete() { 811 int SpdyStream::DoSendHeadersComplete() {
809 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM); 812 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
810 DCHECK_NE(stream_id_, 0u); 813 DCHECK_NE(stream_id_, 0u);
811 if (!delegate_) 814 if (!delegate_)
812 return ERR_UNEXPECTED; 815 return ERR_UNEXPECTED;
813 816
814 io_state_ = 817 io_state_ =
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
935 } 938 }
936 939
937 recv_window_size_ -= delta_window_size; 940 recv_window_size_ -= delta_window_size;
938 net_log_.AddEvent( 941 net_log_.AddEvent(
939 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 942 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
940 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 943 base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
941 stream_id_, -delta_window_size, recv_window_size_)); 944 stream_id_, -delta_window_size, recv_window_size_));
942 } 945 }
943 946
944 } // namespace net 947 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698