Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(676)

Side by Side Diff: net/spdy/bidirectional_spdy_stream.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Make the wrapper class own the stream Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_spdy_stream.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "net/base/request_priority.h"
11 #include "net/spdy/spdy_buffer.h"
12 #include "net/spdy/spdy_header_block.h"
13 #include "net/spdy/spdy_http_utils.h"
14 #include "net/spdy/spdy_stream.h"
15
16 namespace net {
17
18 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
19
20 BidirectionalSpdyStream::BidirectionalSpdyStream(
21 const base::WeakPtr<SpdySession>& spdy_session)
22 : spdy_session_(spdy_session),
23 stream_closed_(false),
24 closed_stream_status_(ERR_FAILED),
25 buffered_read_pending_(false),
26 more_read_data_pending_(false),
27 weak_factory_(this) {}
28
29 BidirectionalSpdyStream::~BidirectionalSpdyStream() {
30 if (stream_.get()) {
31 stream_->DetachDelegate();
32 DCHECK(!stream_.get());
33 }
34 }
35
36 void BidirectionalSpdyStream::Start(const HttpRequestInfo& request_info,
37 RequestPriority priority,
38 const BoundNetLog& net_log,
39 BidirectionalStream::Delegate* delegate) {
40 delegate_ = delegate;
41 DCHECK(!stream_);
42 if (!spdy_session_)
43 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
44
45 request_info_ = request_info;
46
47 int rv = stream_request_.StartRequest(
48 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_.url, priority,
49 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized,
50 weak_factory_.GetWeakPtr()));
51 if (rv != ERR_IO_PENDING)
52 OnStreamInitialized(rv);
53 }
54
55 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) {
56 if (stream_.get())
57 CHECK(!stream_->IsIdle());
58
59 CHECK(buf);
60 CHECK(buf_len);
61 if (!stream_closed_)
62 CHECK(stream_);
63
64 // If there is data buffered, complete the IO immediately.
65 if (!data_queue_.IsEmpty()) {
66 return data_queue_.Dequeue(buf->data(), buf_len);
67 } else if (stream_closed_) {
68 return closed_stream_status_;
69 }
70 // Read will complete asynchronously and Delegate::OnReadCompleted will be
71 // called upon completion.
72 user_buffer_ = buf;
73 user_buffer_len_ = buf_len;
74 return ERR_IO_PENDING;
75 }
76
77 void BidirectionalSpdyStream::SendData(IOBuffer* data,
78 int length,
79 bool end_stream) {
80 stream_->SendData(data, length,
81 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
82 }
83
84 void BidirectionalSpdyStream::Cancel() {
85 if (!stream_)
86 return;
87 // Cancels the stream and detaches the delegate so it doesn't get called back.
88 stream_->DetachDelegate();
89 DCHECK(!stream_);
90 }
91
92 void BidirectionalSpdyStream::OnRequestHeadersSent() {
93 delegate_->OnRequestHeadersSent();
94 }
95
96 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated(
97 const SpdyHeaderBlock& response_headers) {
98 delegate_->OnHeaders(response_headers);
99 return RESPONSE_HEADERS_ARE_COMPLETE;
100 }
101
102 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
103 DCHECK(stream_);
104 DCHECK(!stream_closed_);
105
106 if (buffer) {
107 data_queue_.Enqueue(buffer.Pass());
108 if (user_buffer_) {
109 // Handing small chunks of data to the caller creates measurable overhead.
110 // So buffer data in short time-spans and send a single read notification.
111 ScheduleBufferedRead();
112 }
113 }
114 // If |buffer| is null, onClose will be invoked to indicate the end of stream.
115 }
116
117 void BidirectionalSpdyStream::OnDataSent() {
118 delegate_->OnDataSent();
119 }
120
121 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) {
122 delegate_->OnTrailers(trailers);
123 }
124
125 void BidirectionalSpdyStream::OnClose(int status) {
126 DCHECK(stream_);
127
128 stream_closed_ = true;
129 closed_stream_status_ = status;
130 stream_.reset();
131
132 // Complete remaining buffered read.
133 if (status == OK && user_buffer_) {
134 DoBufferedRead();
135 return;
136 }
137
138 delegate_->OnClose(status);
139 }
140
141 void BidirectionalSpdyStream::SendRequestHeaders() {
142 stream_->SetDelegate(this);
143 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
144 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
145 stream_->GetProtocolVersion(), true,
146 headers.get());
147 bool end_stream = (request_info_.method == "GET");
148 stream_->SendRequestHeaders(
149 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
150 }
151
152 void BidirectionalSpdyStream::OnStreamInitialized(int rv) {
153 DCHECK_NE(ERR_IO_PENDING, rv);
154 if (rv == OK) {
155 stream_ = stream_request_.ReleaseStream();
156 SendRequestHeaders();
157 return;
158 }
159 delegate_->OnFailed(rv);
160 }
161
162 void BidirectionalSpdyStream::ScheduleBufferedRead() {
163 // If there is already a scheduled DoBufferedRead, don't issue
164 // another one. Mark that we have received more data and return.
165 if (buffered_read_pending_) {
166 more_read_data_pending_ = true;
167 return;
168 }
169
170 more_read_data_pending_ = false;
171 buffered_read_pending_ = true;
172 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
173 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedRead,
174 weak_factory_.GetWeakPtr()),
175 kBufferTime);
176 }
177
178 void BidirectionalSpdyStream::DoBufferedRead() {
179 buffered_read_pending_ = false;
180 // If the stream errored out, do not complete the read.
181 if (!stream_ && !stream_closed_)
182 return;
183 if (stream_closed_ && closed_stream_status_ != OK)
184 return;
185
186 // When |more_read_data_pending_| is true, it means that more data has arrived
187 // since started waiting. Wait a little longer and continue to buffer.
188 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
189 ScheduleBufferedRead();
190 return;
191 }
192
193 int rv = 0;
194 if (user_buffer_) {
195 rv = ReadData(user_buffer_.get(), user_buffer_len_);
196 DCHECK_NE(ERR_IO_PENDING, rv);
197 user_buffer_ = nullptr;
198 user_buffer_len_ = 0;
199 delegate_->OnReadCompleted(rv);
200
201 // If all data is read, and BidirectionalSpdyStream::onClose is invoked
202 // previously, let the delegate know about the onClose event.
203 if (data_queue_.IsEmpty() && stream_closed_) {
204 DCHECK_EQ(OK, closed_stream_status_);
205 delegate_->OnClose(OK);
206 }
207 }
208 }
209
210 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const {
211 if (stream_closed_)
212 return false;
213 DCHECK_GT(user_buffer_len_, 0);
214 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
215 }
216
217 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698