Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: Source/modules/fetch/BodyStreamBuffer.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Reflect comments. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/BodyStreamBuffer.h" 6 #include "modules/fetch/BodyStreamBuffer.h"
7 7
8 #include "core/dom/DOMArrayBuffer.h" 8 #include "core/dom/DOMArrayBuffer.h"
9 #include "core/dom/ExceptionCode.h" 9 #include "core/dom/ExceptionCode.h"
10 10
11 namespace blink { 11 namespace blink {
12 12
13 namespace {
14
15 class BlobCreator final : public BodyStreamBuffer::Observer {
16 public:
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB uffer::BlobHandleCreatorClient* client)
18 : m_buffer(buffer)
19 , m_client(client)
20 , m_blobData(BlobData::create())
21 {
22 m_blobData->setContentType(contentType);
23 }
24 ~BlobCreator() override { }
25 DEFINE_INLINE_VIRTUAL_TRACE()
26 {
27 visitor->trace(m_buffer);
28 visitor->trace(m_client);
29 BodyStreamBuffer::Observer::trace(visitor);
30 }
31 void onWrite() override
32 {
33 ASSERT(m_buffer);
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) {
35 m_blobData->appendBytes(buf->data(), buf->byteLength());
36 }
37 }
38 void onClose() override
39 {
40 ASSERT(m_buffer);
41 const long long size = m_blobData->length();
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release( ), size));
43 cleanup();
44 }
45 void onError() override
46 {
47 ASSERT(m_buffer);
48 m_client->didFail(m_buffer->exception());
49 cleanup();
50 }
51 void start()
52 {
53 ASSERT(!m_buffer->isObserverRegistered());
54 m_buffer->registerObserver(this);
55 onWrite();
56 if (m_buffer->hasError()) {
57 return onError();
58 }
59 if (m_buffer->isClosed())
60 return onClose();
61 }
62 void cleanup()
63 {
64 m_buffer->unregisterObserver();
65 m_buffer.clear();
66 m_client.clear();
67 m_blobData.clear();
68 }
69 private:
70 Member<BodyStreamBuffer> m_buffer;
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client;
72 OwnPtr<BlobData> m_blobData;
73 };
74
75 class StreamTeePump : public BodyStreamBuffer::Observer {
76 public:
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body StreamBuffer* outBuffer2)
78 : m_inBuffer(inBuffer)
79 , m_outBuffer1(outBuffer1)
80 , m_outBuffer2(outBuffer2)
81 {
82 }
83 void onWrite() override
84 {
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) {
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
87 m_outBuffer1->write(buf);
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
89 m_outBuffer2->write(buf);
90 }
91 }
92 void onClose() override
93 {
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
95 m_outBuffer1->close();
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
97 m_outBuffer2->close();
98 cleanup();
99 }
100 void onError() override
101 {
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
103 m_outBuffer1->error(m_inBuffer->exception());
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
105 m_outBuffer2->error(m_inBuffer->exception());
106 cleanup();
107 }
108 DEFINE_INLINE_VIRTUAL_TRACE()
109 {
110 BodyStreamBuffer::Observer::trace(visitor);
111 visitor->trace(m_inBuffer);
112 visitor->trace(m_outBuffer1);
113 visitor->trace(m_outBuffer2);
114 }
115 void start()
116 {
117 m_inBuffer->registerObserver(this);
118 onWrite();
119 if (m_inBuffer->hasError())
120 return onError();
121 if (m_inBuffer->isClosed())
122 return onClose();
123 }
124
125 private:
126 void cleanup()
127 {
128 m_inBuffer->unregisterObserver();
129 m_inBuffer.clear();
130 m_outBuffer1.clear();
131 m_outBuffer2.clear();
132 }
133 Member<BodyStreamBuffer> m_inBuffer;
134 Member<BodyStreamBuffer> m_outBuffer1;
135 Member<BodyStreamBuffer> m_outBuffer2;
136 };
137
138 // WebDataConsumerHandleAdapter is used to migrate incrementally
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed
140 // after the migration.
141 class WebDataConsumerHandleAdapter
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter>
143 , public WebDataConsumerHandle::Client {
144 public:
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
146 : m_reader(handle->obtainReader(this))
147 , m_failureMessage(failureMessage)
148 , m_outputBuffer(new BodyStreamBuffer(new Canceller(this)))
149 {
150 ASSERT(m_reader);
151 }
152
153 BodyStreamBuffer* outputBuffer() { return m_outputBuffer; }
154
155 DEFINE_INLINE_TRACE()
156 {
157 visitor->trace(m_outputBuffer);
158 }
159
160 private:
161 class Canceller : public BodyStreamBuffer::Canceller {
162 public:
163 explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(sour ce) { }
164
165 void cancel() override
166 {
167 m_source->close();
168 }
169
170 DEFINE_INLINE_VIRTUAL_TRACE()
171 {
172 BodyStreamBuffer::Canceller::trace(visitor);
173 visitor->trace(m_source);
174 }
175
176 private:
177 Member<WebDataConsumerHandleAdapter> m_source;
178 };
179
180 void didGetReadable() override
181 {
182 while (true) {
183 const void* buffer;
184 size_t available;
185 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
186 switch (result) {
187 case WebDataConsumerHandle::Ok:
188 m_outputBuffer->write(DOMArrayBuffer::create(buffer, available)) ;
189 m_reader->endRead(available);
190 break;
191
192 case WebDataConsumerHandle::Done:
193 close();
194 return;
195
196 case WebDataConsumerHandle::ShouldWait:
197 return;
198
199 case WebDataConsumerHandle::Busy:
200 case WebDataConsumerHandle::ResourceExhausted:
201 case WebDataConsumerHandle::UnexpectedError:
202 error();
203 return;
204 }
205 }
206 }
207
208 void error()
209 {
210 m_reader.clear();
211 m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessag e));
212 m_outputBuffer.clear();
213 }
214
215 void close()
216 {
217 m_reader.clear();
218 m_outputBuffer->close();
219 m_outputBuffer.clear();
220 }
221
222 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
223 String m_failureMessage;
224
225 Member<BodyStreamBuffer> m_outputBuffer;
226 };
227
228
229 } // namespace
230
231 PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read()
232 {
233 if (m_queue.isEmpty())
234 return PassRefPtr<DOMArrayBuffer>();
235 return m_queue.takeFirst();
236 }
237
238 void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk)
239 {
240 ASSERT(!m_isClosed);
241 ASSERT(!m_exception);
242 ASSERT(chunk);
243 m_queue.append(chunk);
244 if (m_observer)
245 m_observer->onWrite();
246 }
247
248 void BodyStreamBuffer::close()
249 {
250 ASSERT(!m_isClosed);
251 ASSERT(!m_exception);
252 m_isClosed = true;
253 if (m_observer)
254 m_observer->onClose();
255 }
256
257 void BodyStreamBuffer::error(DOMException* exception)
258 {
259 ASSERT(exception);
260 ASSERT(!m_isClosed);
261 ASSERT(!m_exception);
262 m_exception = exception;
263 if (m_observer)
264 m_observer->onError();
265 }
266
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo bHandleCreatorClient* client)
268 {
269 if (m_observer)
270 return false;
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client);
272 blobCreator->start();
273 return true;
274 }
275
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2)
277 {
278 if (m_observer)
279 return false;
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2);
281 teePump->start();
282 return true;
283 }
284
285 bool BodyStreamBuffer::registerObserver(Observer* observer)
286 {
287 if (m_observer)
288 return false;
289 ASSERT(observer);
290 m_observer = observer;
291 return true;
292 }
293
294 void BodyStreamBuffer::unregisterObserver()
295 {
296 m_observer.clear();
297 }
298
299 DEFINE_TRACE(BodyStreamBuffer)
300 {
301 visitor->trace(m_exception);
302 visitor->trace(m_observer);
303 visitor->trace(m_canceller);
304 }
305
306 BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller)
307 : m_isClosed(false)
308 , m_canceller(canceller)
309 {
310 }
311
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han dle, const String& failureMessage)
313 {
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf fer();
315 }
316
317 } // namespace blink 13 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698