Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(523)

Side by Side Diff: trunk/src/net/spdy/spdy_stream.cc

Issue 13996009: Revert 194560 "[SPDY] Replace SpdyIOBuffer with new SpdyBuffer c..." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « trunk/src/net/spdy/spdy_stream.h ('k') | trunk/src/net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_stream.h" 5 #include "net/spdy/spdy_stream.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/message_loop.h" 12 #include "base/message_loop.h"
13 #include "base/stringprintf.h" 13 #include "base/stringprintf.h"
14 #include "base/values.h" 14 #include "base/values.h"
15 #include "net/spdy/spdy_buffer_producer.h" 15 #include "net/spdy/spdy_frame_producer.h"
16 #include "net/spdy/spdy_http_utils.h" 16 #include "net/spdy/spdy_http_utils.h"
17 #include "net/spdy/spdy_session.h" 17 #include "net/spdy/spdy_session.h"
18 18
19 namespace net { 19 namespace net {
20 20
21 namespace { 21 namespace {
22 22
23 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 23 Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
24 int status, 24 int status,
25 const std::string* description, 25 const std::string* description,
(...skipping 21 matching lines...) Expand all
47 if (*i >= 'A' && *i <= 'Z') { 47 if (*i >= 'A' && *i <= 'Z') {
48 return true; 48 return true;
49 } 49 }
50 } 50 }
51 return false; 51 return false;
52 } 52 }
53 53
54 } // namespace 54 } // namespace
55 55
56 // A wrapper around a stream that calls into ProduceSynStreamFrame(). 56 // A wrapper around a stream that calls into ProduceSynStreamFrame().
57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { 57 class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer {
58 public: 58 public:
59 SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) 59 SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream)
60 : stream_(stream) { 60 : stream_(stream) {
61 DCHECK(stream_); 61 DCHECK(stream_);
62 } 62 }
63 63
64 virtual ~SynStreamBufferProducer() {} 64 virtual ~SynStreamFrameProducer() {}
65 65
66 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 66 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
67 if (!stream_) { 67 if (!stream_) {
68 NOTREACHED(); 68 NOTREACHED();
69 return scoped_ptr<SpdyBuffer>(); 69 return scoped_ptr<SpdyFrame>();
70 } 70 }
71 DCHECK_GT(stream_->stream_id(), 0u); 71 DCHECK_GT(stream_->stream_id(), 0u);
72 return scoped_ptr<SpdyBuffer>( 72 return stream_->ProduceSynStreamFrame();
73 new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74 } 73 }
75 74
76 private: 75 private:
77 const base::WeakPtr<SpdyStream> stream_; 76 const base::WeakPtr<SpdyStream> stream_;
78 }; 77 };
79 78
80 // A wrapper around a stream that calls into ProduceHeaderFrame() with 79 // A wrapper around a stream that calls into ProduceHeaderFrame() with
81 // a given header block. 80 // a given header block.
82 class SpdyStream::HeaderBufferProducer : public SpdyBufferProducer { 81 class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer {
83 public: 82 public:
84 HeaderBufferProducer(const base::WeakPtr<SpdyStream>& stream, 83 HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream,
85 scoped_ptr<SpdyHeaderBlock> headers) 84 scoped_ptr<SpdyHeaderBlock> headers)
86 : stream_(stream), 85 : stream_(stream),
87 headers_(headers.Pass()) { 86 headers_(headers.Pass()) {
88 DCHECK(stream_); 87 DCHECK(stream_);
89 DCHECK(headers_); 88 DCHECK(headers_);
90 } 89 }
91 90
92 virtual ~HeaderBufferProducer() {} 91 virtual ~HeaderFrameProducer() {}
93 92
94 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 93 virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
95 if (!stream_) { 94 if (!stream_) {
96 NOTREACHED(); 95 NOTREACHED();
97 return scoped_ptr<SpdyBuffer>(); 96 return scoped_ptr<SpdyFrame>();
98 } 97 }
99 DCHECK_GT(stream_->stream_id(), 0u); 98 DCHECK_GT(stream_->stream_id(), 0u);
100 return scoped_ptr<SpdyBuffer>( 99 return stream_->ProduceHeaderFrame(headers_.Pass());
101 new SpdyBuffer(stream_->ProduceHeaderFrame(headers_.Pass())));
102 } 100 }
103 101
104 private: 102 private:
105 const base::WeakPtr<SpdyStream> stream_; 103 const base::WeakPtr<SpdyStream> stream_;
106 scoped_ptr<SpdyHeaderBlock> headers_; 104 scoped_ptr<SpdyHeaderBlock> headers_;
107 }; 105 };
108 106
109 SpdyStream::SpdyStream(SpdySession* session, 107 SpdyStream::SpdyStream(SpdySession* session,
110 const std::string& path, 108 const std::string& path,
111 RequestPriority priority, 109 RequestPriority priority,
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 // HEADERS frame. Since we don't have headers, we had better not have 168 // HEADERS frame. Since we don't have headers, we had better not have
171 // any pending data frames. 169 // any pending data frames.
172 if (pending_buffers_.size() != 0U) { 170 if (pending_buffers_.size() != 0U) {
173 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 171 LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
174 "HEADERS incomplete headers, but pending data frames."); 172 "HEADERS incomplete headers, but pending data frames.");
175 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 173 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
176 } 174 }
177 return; 175 return;
178 } 176 }
179 177
180 std::vector<SpdyBuffer*> buffers; 178 std::vector<scoped_refptr<IOBufferWithSize> > buffers;
181 pending_buffers_.release(&buffers); 179 buffers.swap(pending_buffers_);
182 for (size_t i = 0; i < buffers.size(); ++i) { 180 for (size_t i = 0; i < buffers.size(); ++i) {
183 // It is always possible that a callback to the delegate results in 181 // It is always possible that a callback to the delegate results in
184 // the delegate no longer being available. 182 // the delegate no longer being available.
185 if (!delegate_) 183 if (!delegate_)
186 break; 184 break;
187 if (buffers[i]) { 185 if (buffers[i]) {
188 delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i])); 186 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
189 } else { 187 } else {
190 delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>()); 188 delegate_->OnDataReceived(NULL, 0);
191 session_->CloseStream(stream_id_, net::OK); 189 session_->CloseStream(stream_id_, net::OK);
192 // Note: |this| may be deleted after calling CloseStream. 190 // Note: |this| may be deleted after calling CloseStream.
193 DCHECK_EQ(buffers.size() - 1, i); 191 DCHECK_EQ(buffers.size() - 1, i);
194 } 192 }
195 } 193 }
196 } 194 }
197 195
198 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { 196 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
199 CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE); 197 CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE);
200 CHECK(request_.get()); 198 CHECK(request_.get());
(...skipping 244 matching lines...) Expand 10 before | Expand all | Expand 10 after
445 if (delegate_) { 443 if (delegate_) {
446 rv = delegate_->OnResponseReceived(*response_, response_time_, rv); 444 rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
447 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more 445 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
448 // headers before the response header block is complete. 446 // headers before the response header block is complete.
449 if (rv == ERR_INCOMPLETE_SPDY_HEADERS) 447 if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
450 rv = OK; 448 rv = OK;
451 } 449 }
452 return rv; 450 return rv;
453 } 451 }
454 452
455 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { 453 void SpdyStream::OnDataReceived(const char* data, size_t length) {
456 DCHECK(session_->IsStreamActive(stream_id_)); 454 DCHECK(session_->IsStreamActive(stream_id_));
455 DCHECK_LT(length, 1u << 24);
457 // If we don't have a response, then the SYN_REPLY did not come through. 456 // If we don't have a response, then the SYN_REPLY did not come through.
458 // We cannot pass data up to the caller unless the reply headers have been 457 // We cannot pass data up to the caller unless the reply headers have been
459 // received. 458 // received.
460 if (!response_received()) { 459 if (!response_received()) {
461 LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response."); 460 LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response.");
462 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); 461 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
463 return; 462 return;
464 } 463 }
465 464
466 if (!delegate_ || continue_buffering_data_) { 465 if (!delegate_ || continue_buffering_data_) {
467 // It should be valid for this to happen in the server push case. 466 // It should be valid for this to happen in the server push case.
468 // We'll return received data when delegate gets attached to the stream. 467 // We'll return received data when delegate gets attached to the stream.
469 if (buffer) { 468 if (length > 0) {
470 pending_buffers_.push_back(buffer.release()); 469 IOBufferWithSize* buf = new IOBufferWithSize(length);
470 memcpy(buf->data(), data, length);
471 pending_buffers_.push_back(make_scoped_refptr(buf));
471 } else { 472 } else {
472 pending_buffers_.push_back(NULL); 473 pending_buffers_.push_back(NULL);
473 metrics_.StopStream(); 474 metrics_.StopStream();
474 // Note: we leave the stream open in the session until the stream 475 // Note: we leave the stream open in the session until the stream
475 // is claimed. 476 // is claimed.
476 } 477 }
477 return; 478 return;
478 } 479 }
479 480
480 CHECK(!closed()); 481 CHECK(!closed());
481 482
482 if (!buffer) { 483 // A zero-length read means that the stream is being closed.
484 if (length == 0) {
483 metrics_.StopStream(); 485 metrics_.StopStream();
484 session_->CloseStream(stream_id_, net::OK); 486 session_->CloseStream(stream_id_, net::OK);
485 // Note: |this| may be deleted after calling CloseStream. 487 // Note: |this| may be deleted after calling CloseStream.
486 return; 488 return;
487 } 489 }
488 490
489 size_t length = buffer->GetRemainingSize();
490 DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
491 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) 491 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM)
492 DecreaseRecvWindowSize(static_cast<int32>(length)); 492 DecreaseRecvWindowSize(static_cast<int32>(length));
493 493
494 // Track our bandwidth. 494 // Track our bandwidth.
495 metrics_.RecordBytes(length); 495 metrics_.RecordBytes(length);
496 recv_bytes_ += length; 496 recv_bytes_ += length;
497 recv_last_byte_time_ = base::TimeTicks::Now(); 497 recv_last_byte_time_ = base::TimeTicks::Now();
498 498
499 if (delegate_->OnDataReceived(buffer.Pass()) != net::OK) { 499 if (delegate_->OnDataReceived(data, length) != net::OK) {
500 // |delegate_| rejected the data. 500 // |delegate_| rejected the data.
501 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data"); 501 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data");
502 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 502 session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
503 return; 503 return;
504 } 504 }
505 } 505 }
506 506
507 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, 507 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
508 size_t frame_size) { 508 size_t frame_size) {
509 if (frame_size < session_->GetFrameMinimumSize() || 509 if (frame_size < session_->GetFrameMinimumSize() ||
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
573 } 573 }
574 574
575 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { 575 void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
576 // Until the first headers by SYN_STREAM have been completely sent, we can 576 // Until the first headers by SYN_STREAM have been completely sent, we can
577 // not be sure that our stream_id is correct. 577 // not be sure that our stream_id is correct.
578 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 578 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
579 CHECK_GT(stream_id_, 0u); 579 CHECK_GT(stream_id_, 0u);
580 580
581 session_->EnqueueStreamWrite( 581 session_->EnqueueStreamWrite(
582 this, HEADERS, 582 this, HEADERS,
583 scoped_ptr<SpdyBufferProducer>( 583 scoped_ptr<SpdyFrameProducer>(
584 new HeaderBufferProducer( 584 new HeaderFrameProducer(
585 weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); 585 weak_ptr_factory_.GetWeakPtr(), headers.Pass())));
586 } 586 }
587 587
588 void SpdyStream::QueueStreamData(IOBuffer* data, 588 void SpdyStream::QueueStreamData(IOBuffer* data,
589 int length, 589 int length,
590 SpdyDataFlags flags) { 590 SpdyDataFlags flags) {
591 // Until the headers have been completely sent, we can not be sure 591 // Until the headers have been completely sent, we can not be sure
592 // that our stream_id is correct. 592 // that our stream_id is correct.
593 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); 593 DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
594 CHECK_GT(stream_id_, 0u); 594 CHECK_GT(stream_id_, 0u);
595 CHECK(!cancelled()); 595 CHECK(!cancelled());
596 596
597 scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer( 597 scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame(
598 stream_id_, data, length, flags)); 598 stream_id_, data, length, flags));
599 // We'll get called again by PossiblyResumeIfSendStalled(). 599 if (!data_frame)
600 if (!data_buffer)
601 return; 600 return;
602 601
603 session_->EnqueueStreamWrite( 602 session_->EnqueueStreamWrite(
604 this, DATA, 603 this, DATA,
605 scoped_ptr<SpdyBufferProducer>( 604 scoped_ptr<SpdyFrameProducer>(
606 new SimpleBufferProducer(data_buffer.Pass()))); 605 new SimpleFrameProducer(data_frame.Pass())));
607 } 606 }
608 607
609 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 608 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
610 bool* was_npn_negotiated, 609 bool* was_npn_negotiated,
611 NextProto* protocol_negotiated) { 610 NextProto* protocol_negotiated) {
612 return session_->GetSSLInfo( 611 return session_->GetSSLInfo(
613 ssl_info, was_npn_negotiated, protocol_negotiated); 612 ssl_info, was_npn_negotiated, protocol_negotiated);
614 } 613 }
615 614
616 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 615 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
(...skipping 161 matching lines...) Expand 10 before | Expand all | Expand 10 after
778 DCHECK(frame); 777 DCHECK(frame);
779 // TODO(akalin): Fix the following race condition: 778 // TODO(akalin): Fix the following race condition:
780 // 779 //
781 // Since this is decoupled from sending the SYN_STREAM frame, it is 780 // Since this is decoupled from sending the SYN_STREAM frame, it is
782 // possible that other domain-bound cert frames will clobber ours 781 // possible that other domain-bound cert frames will clobber ours
783 // before our SYN_STREAM frame gets sent. This can be solved by 782 // before our SYN_STREAM frame gets sent. This can be solved by
784 // immediately enqueueing the SYN_STREAM frame here and adjusting 783 // immediately enqueueing the SYN_STREAM frame here and adjusting
785 // the state machine appropriately. 784 // the state machine appropriately.
786 session_->EnqueueStreamWrite( 785 session_->EnqueueStreamWrite(
787 this, CREDENTIAL, 786 this, CREDENTIAL,
788 scoped_ptr<SpdyBufferProducer>( 787 scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())));
789 new SimpleBufferProducer(
790 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
791 return ERR_IO_PENDING; 788 return ERR_IO_PENDING;
792 } 789 }
793 790
794 int SpdyStream::DoSendDomainBoundCertComplete() { 791 int SpdyStream::DoSendDomainBoundCertComplete() {
795 DCHECK_EQ(just_completed_frame_type_, CREDENTIAL); 792 DCHECK_EQ(just_completed_frame_type_, CREDENTIAL);
796 io_state_ = STATE_SEND_HEADERS; 793 io_state_ = STATE_SEND_HEADERS;
797 return OK; 794 return OK;
798 } 795 }
799 796
800 int SpdyStream::DoSendHeaders() { 797 int SpdyStream::DoSendHeaders() {
801 CHECK(!cancelled_); 798 CHECK(!cancelled_);
802 io_state_ = STATE_SEND_HEADERS_COMPLETE; 799 io_state_ = STATE_SEND_HEADERS_COMPLETE;
803 800
804 session_->EnqueueStreamWrite( 801 session_->EnqueueStreamWrite(
805 this, SYN_STREAM, 802 this, SYN_STREAM,
806 scoped_ptr<SpdyBufferProducer>( 803 scoped_ptr<SpdyFrameProducer>(
807 new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr()))); 804 new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr())));
808 return ERR_IO_PENDING; 805 return ERR_IO_PENDING;
809 } 806 }
810 807
811 int SpdyStream::DoSendHeadersComplete() { 808 int SpdyStream::DoSendHeadersComplete() {
812 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM); 809 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
813 DCHECK_NE(stream_id_, 0u); 810 DCHECK_NE(stream_id_, 0u);
814 if (!delegate_) 811 if (!delegate_)
815 return ERR_UNEXPECTED; 812 return ERR_UNEXPECTED;
816 813
817 io_state_ = 814 io_state_ =
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
938 } 935 }
939 936
940 recv_window_size_ -= delta_window_size; 937 recv_window_size_ -= delta_window_size;
941 net_log_.AddEvent( 938 net_log_.AddEvent(
942 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 939 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
943 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 940 base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
944 stream_id_, -delta_window_size, recv_window_size_)); 941 stream_id_, -delta_window_size, recv_window_size_));
945 } 942 }
946 943
947 } // namespace net 944 } // namespace net
OLDNEW
« no previous file with comments | « trunk/src/net/spdy/spdy_stream.h ('k') | trunk/src/net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698