Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebased Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_job.h"
6
7 #include "base/bind.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/time/time.h"
10 #include "net/base/request_priority.h"
11 #include "net/spdy/spdy_buffer.h"
12 #include "net/spdy/spdy_header_block.h"
13 #include "net/spdy/spdy_http_utils.h"
14 #include "net/spdy/spdy_stream.h"
15
16 namespace net {
17
18 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
19
20 BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
21 const base::WeakPtr<SpdySession>& spdy_session)
22 : spdy_session_(spdy_session),
23 stream_closed_(false),
24 closed_stream_status_(ERR_FAILED),
25 buffered_read_pending_(false),
26 more_read_data_pending_(false),
27 weak_factory_(this) {}
28
29 BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
30 if (stream_.get()) {
31 stream_->DetachDelegate();
32 DCHECK(!stream_.get());
33 }
34 }
35
36 void BidirectionalStreamSpdyJob::Start(
37 const HttpRequestInfo& request_info,
38 RequestPriority priority,
39 const BoundNetLog& net_log,
40 BidirectionalStreamJob::Delegate* delegate) {
41 delegate_ = delegate;
42 DCHECK(!stream_);
43 if (!spdy_session_)
44 delegate_->OnFailed(ERR_CONNECTION_CLOSED);
45
46 request_info_ = request_info;
47
48 int rv = stream_request_.StartRequest(
49 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_.url, priority,
50 net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
51 weak_factory_.GetWeakPtr()));
52 if (rv != ERR_IO_PENDING)
53 OnStreamInitialized(rv);
54 }
55
56 int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
57 if (stream_.get())
58 CHECK(!stream_->IsIdle());
59
60 CHECK(buf);
61 CHECK(buf_len);
62 if (!stream_closed_)
63 CHECK(stream_);
64
65 // If there is data buffered, complete the IO immediately.
66 if (!data_queue_.IsEmpty()) {
67 return data_queue_.Dequeue(buf->data(), buf_len);
68 } else if (stream_closed_) {
69 return closed_stream_status_;
70 }
71 // Read will complete asynchronously and Delegate::OnReadCompleted will be
72 // called upon completion.
73 user_buffer_ = buf;
74 user_buffer_len_ = buf_len;
75 return ERR_IO_PENDING;
76 }
77
78 void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
79 int length,
80 bool end_stream) {
81 stream_->SendData(data, length,
82 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
83 }
84
85 void BidirectionalStreamSpdyJob::Cancel() {
86 if (!stream_)
87 return;
88 // Cancels the stream and detaches the delegate so it doesn't get called back.
89 stream_->DetachDelegate();
90 DCHECK(!stream_);
91 }
92
93 void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
94 delegate_->OnRequestHeadersSent();
95 }
96
97 SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
98 const SpdyHeaderBlock& response_headers) {
99 delegate_->OnHeaders(response_headers);
100 return RESPONSE_HEADERS_ARE_COMPLETE;
101 }
102
103 void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
104 DCHECK(stream_);
105 DCHECK(!stream_closed_);
106
107 if (buffer) {
108 data_queue_.Enqueue(buffer.Pass());
109 if (user_buffer_) {
110 // Handing small chunks of data to the caller creates measurable overhead.
111 // So buffer data in short time-spans and send a single read notification.
112 ScheduleBufferedRead();
113 }
114 }
115 // If |buffer| is null, onClose will be invoked to indicate the end of stream.
116 }
117
118 void BidirectionalStreamSpdyJob::OnDataSent() {
119 delegate_->OnDataSent();
120 }
121
122 void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
123 delegate_->OnTrailers(trailers);
124 }
125
126 void BidirectionalStreamSpdyJob::OnClose(int status) {
127 DCHECK(stream_);
128
129 stream_closed_ = true;
130 closed_stream_status_ = status;
131 stream_.reset();
132
133 // Complete remaining buffered read.
134 if (status == OK && user_buffer_) {
135 DoBufferedRead();
136 return;
137 }
138
139 delegate_->OnClose(status);
140 }
141
142 void BidirectionalStreamSpdyJob::SendRequestHeaders() {
143 stream_->SetDelegate(this);
144 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
145 CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
146 stream_->GetProtocolVersion(), true,
147 headers.get());
148 bool end_stream = (request_info_.method == "GET");
149 stream_->SendRequestHeaders(
150 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
151 }
152
153 void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
154 DCHECK_NE(ERR_IO_PENDING, rv);
155 if (rv == OK) {
156 stream_ = stream_request_.ReleaseStream();
157 SendRequestHeaders();
158 return;
159 }
160 delegate_->OnFailed(static_cast<Error>(rv));
161 }
162
163 void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
164 // If there is already a scheduled DoBufferedRead, don't issue
165 // another one. Mark that we have received more data and return.
166 if (buffered_read_pending_) {
167 more_read_data_pending_ = true;
168 return;
169 }
170
171 more_read_data_pending_ = false;
172 buffered_read_pending_ = true;
173 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask(
174 FROM_HERE, base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
175 weak_factory_.GetWeakPtr()),
176 kBufferTime);
177 }
178
179 void BidirectionalStreamSpdyJob::DoBufferedRead() {
180 buffered_read_pending_ = false;
181 // If the stream errored out, do not complete the read.
182 if (!stream_ && !stream_closed_)
183 return;
184 if (stream_closed_ && closed_stream_status_ != OK)
185 return;
186
187 // When |more_read_data_pending_| is true, it means that more data has arrived
188 // since started waiting. Wait a little longer and continue to buffer.
189 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
190 ScheduleBufferedRead();
191 return;
192 }
193
194 int rv = 0;
195 if (user_buffer_) {
196 rv = ReadData(user_buffer_.get(), user_buffer_len_);
197 DCHECK_NE(ERR_IO_PENDING, rv);
198 user_buffer_ = nullptr;
199 user_buffer_len_ = 0;
200 delegate_->OnReadCompleted(rv);
201
202 // If all data is read, and BidirectionalStreamSpdyJob::onClose is invoked
203 // previously, let the delegate know about the onClose event.
204 if (data_queue_.IsEmpty() && stream_closed_) {
205 DCHECK_EQ(OK, closed_stream_status_);
206 delegate_->OnClose(OK);
207 }
208 }
209 }
210
211 bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
212 if (stream_closed_)
213 return false;
214 DCHECK_GT(user_buffer_len_, 0);
215 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
216 }
217
218 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698