Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(679)

Side by Side Diff: net/spdy/bidirectional_spdy_stream.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Pass through priority and netlog Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_spdy_stream.h"
6
7 #include "base/memory/scoped_ptr.h"
8 #include "base/time/time.h"
9 #include "net/base/request_priority.h"
10 #include "net/spdy/spdy_buffer.h"
11 #include "net/spdy/spdy_header_block.h"
12 #include "net/spdy/spdy_http_utils.h"
13 #include "net/spdy/spdy_stream.h"
14
15 namespace net {
16
17 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
18
19 BidirectionalSpdyStream::BidirectionalSpdyStream(
20 const base::WeakPtr<SpdySession>& spdy_session)
21 : spdy_session_(spdy_session),
22 stream_closed_(false),
23 closed_stream_status_(ERR_FAILED),
24 buffered_read_callback_pending_(false),
25 more_read_data_pending_(false),
26 weak_factory_(this) {}
27
28 BidirectionalSpdyStream::~BidirectionalSpdyStream() {
29 if (stream_.get()) {
30 stream_->DetachDelegate();
31 DCHECK(!stream_.get());
32 }
33 }
34
35 void BidirectionalSpdyStream::Start(const HttpRequestInfo* request_info,
36 RequestPriority priority,
37 const BoundNetLog& net_log,
38 BidirectionalStream::Delegate* delegate) {
39 delegate_ = delegate;
40 DCHECK(!stream_);
41 if (!spdy_session_)
42 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
43
44 request_info_ = request_info;
45
46 int rv = stream_request_.StartRequest(
47 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority,
48 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized,
49 weak_factory_.GetWeakPtr()));
50 if (rv != ERR_IO_PENDING)
51 OnStreamInitialized(rv);
52 }
53
54 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) {
55 if (stream_.get())
56 CHECK(!stream_->IsIdle());
57
58 CHECK(buf);
59 CHECK(buf_len);
60 if (!stream_closed_)
61 CHECK(stream_);
62
63 // If there is data buffered, complete the IO immediately.
64 if (!data_queue_.IsEmpty()) {
65 return data_queue_.Dequeue(buf->data(), buf_len);
66 } else if (stream_closed_) {
67 return closed_stream_status_;
68 }
69 user_buffer_ = buf;
70 user_buffer_len_ = buf_len;
71 return ERR_IO_PENDING;
72 }
73
74 void BidirectionalSpdyStream::SendData(IOBuffer* data,
75 int length,
76 bool end_stream) {
77 stream_->SendData(data, length,
78 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
79 }
80
81 void BidirectionalSpdyStream::OnRequestHeadersSent() {
82 delegate_->OnRequestHeadersSent();
83 }
84
85 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated(
86 const SpdyHeaderBlock& response_headers) {
87 delegate_->OnHeaders(response_headers);
88 return RESPONSE_HEADERS_ARE_COMPLETE;
89 }
90
91 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
92 DCHECK(stream_);
93 data_queue_.Enqueue(buffer.Pass());
94 if (user_buffer_.get()) {
95 // Handing small chunks of data to the caller creates measurable overhead.
96 // So buffer data in short time-spans and send a single read notification.
97 ScheduleBufferedReadCallback();
98 }
99 }
100
101 void BidirectionalSpdyStream::OnDataSent() {
102 delegate_->OnDataSent();
103 }
104
105 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) {
106 delegate_->OnTrailers(trailers);
107 }
108
109 void BidirectionalSpdyStream::OnClose(int status) {
110 if (stream_.get()) {
111 stream_closed_ = true;
112 closed_stream_status_ = status;
113 }
114
115 stream_.reset();
116 // Complete remaining buffered read.
mef 2015/10/07 23:44:56 what if there is no pending read?
xunjieli 2015/10/19 21:07:46 Done. Good catch! I added null check to handle thi
117 if (status == OK) {
118 DoBufferedReadCallback();
119 return;
120 }
121
122 delegate_->OnClose(status);
123 }
124
125 void BidirectionalSpdyStream::SendRequestHeaders() {
126 stream_->SetDelegate(this);
127 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
128 CreateSpdyHeadersFromHttpRequest(*request_info_, request_info_->extra_headers,
129 stream_->GetProtocolVersion(), true,
130 headers.get());
131 bool end_stream = (request_info_->method == "GET");
132 stream_->SendRequestHeaders(
133 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
134 }
135
136 void BidirectionalSpdyStream::OnStreamInitialized(int rv) {
137 DCHECK_NE(ERR_IO_PENDING, rv);
138 if (rv == OK) {
139 stream_ = stream_request_.ReleaseStream();
140 SendRequestHeaders();
141 return;
142 }
143 delegate_->OnFailed(rv);
144 }
145
146 void BidirectionalSpdyStream::ScheduleBufferedReadCallback() {
147 // If there is already a scheduled DoBufferedReadCallback, don't issue
148 // another one. Mark that we have received more data and return.
149 if (buffered_read_callback_pending_) {
150 more_read_data_pending_ = true;
151 return;
152 }
153
154 more_read_data_pending_ = false;
155 buffered_read_callback_pending_ = true;
156 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
157 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedReadCallback,
158 weak_factory_.GetWeakPtr()),
159 kBufferTime);
160 }
161
162 void BidirectionalSpdyStream::DoBufferedReadCallback() {
163 buffered_read_callback_pending_ = false;
164 // If the stream errored out, do not complete the read.
165 if (!stream_ && !stream_closed_)
166 return;
167 if (stream_closed_ && closed_stream_status_ != OK)
168 return;
169
170 // When |more_read_data_pending_| is true, it means that more data has arrived
171 // since started waiting. Wait a little longer and continue to buffer.
172 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
173 ScheduleBufferedReadCallback();
174 return;
175 }
176
177 if (!user_buffer_.get())
178 return;
179
180 int rv = ReadData(user_buffer_.get(), user_buffer_len_);
181 DCHECK_NE(ERR_IO_PENDING, rv);
182 delegate_->OnReadCompleted(rv);
183 if (data_queue_.IsEmpty() && stream_closed_)
184 delegate_->OnClose(closed_stream_status_);
185 }
186
187 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const {
188 if (stream_closed_)
189 return false;
190 DCHECK_GT(user_buffer_len_, 0);
191 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
192 }
193
194 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698