Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: Source/modules/fetch/BodyStreamBuffer.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Reflect comments. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/BodyStreamBuffer.h" 6 #include "modules/fetch/BodyStreamBuffer.h"
7 7
8 #include "core/dom/DOMArrayBuffer.h" 8 #include "core/dom/DOMArrayBuffer.h"
9 #include "core/dom/ExceptionCode.h" 9 #include "core/dom/ExceptionCode.h"
10 10
11 namespace blink { 11 namespace blink {
12 12
13 namespace { 13 BodyStreamBuffer* BodyStreamBuffer::createEmpty()
14 {
15 return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(c reateDoneDataConsumerHandle()));
16 }
14 17
15 class BlobCreator final : public BodyStreamBuffer::Observer { 18 FetchDataConsumerHandle* BodyStreamBuffer::handle() const
19 {
20 ASSERT(!m_fetchDataLoader);
21 ASSERT(!m_drainingStreamNotificationClient);
22 return m_handle.get();
23 }
24
25 PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
26 {
27 ASSERT(!m_fetchDataLoader);
28 ASSERT(!m_drainingStreamNotificationClient);
29 return m_handle.release();
30 }
31
32 class ClientWithFinishNotification final : public GarbageCollectedFinalized<Clie ntWithFinishNotification>, public FetchDataLoader::Client {
33 USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification);
16 public: 34 public:
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB uffer::BlobHandleCreatorClient* client) 35 ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Clie nt* client)
18 : m_buffer(buffer) 36 : m_buffer(buffer)
19 , m_client(client) 37 , m_client(client)
20 , m_blobData(BlobData::create())
21 { 38 {
22 m_blobData->setContentType(contentType);
23 } 39 }
24 ~BlobCreator() override { } 40
25 DEFINE_INLINE_VIRTUAL_TRACE() 41 DEFINE_INLINE_VIRTUAL_TRACE()
26 { 42 {
27 visitor->trace(m_buffer); 43 visitor->trace(m_buffer);
28 visitor->trace(m_client); 44 visitor->trace(m_client);
29 BodyStreamBuffer::Observer::trace(visitor); 45 FetchDataLoader::Client::trace(visitor);
30 }
31 void onWrite() override
32 {
33 ASSERT(m_buffer);
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) {
35 m_blobData->appendBytes(buf->data(), buf->byteLength());
36 }
37 }
38 void onClose() override
39 {
40 ASSERT(m_buffer);
41 const long long size = m_blobData->length();
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release( ), size));
43 cleanup();
44 }
45 void onError() override
46 {
47 ASSERT(m_buffer);
48 m_client->didFail(m_buffer->exception());
49 cleanup();
50 }
51 void start()
52 {
53 ASSERT(!m_buffer->isObserverRegistered());
54 m_buffer->registerObserver(this);
55 onWrite();
56 if (m_buffer->hasError()) {
57 return onError();
58 }
59 if (m_buffer->isClosed())
60 return onClose();
61 }
62 void cleanup()
63 {
64 m_buffer->unregisterObserver();
65 m_buffer.clear();
66 m_client.clear();
67 m_blobData.clear();
68 } 46 }
yhirano 2015/07/06 03:25:39 +empty line
hiroshige 2015/07/06 05:47:07 Done.
69 private: 47 private:
48 void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override
49 {
50 if (m_client)
51 m_client->didFetchDataLoadedBlobHandle(blobDataHandle);
52 m_buffer->didFetchDataLoadFinished();
53 }
54 void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) o verride
55 {
56 if (m_client)
57 m_client->didFetchDataLoadedArrayBuffer(arrayBuffer);
58 m_buffer->didFetchDataLoadFinished();
59 }
60 void didFetchDataLoadedString(const String& str) override
61 {
62 if (m_client)
63 m_client->didFetchDataLoadedString(str);
64 m_buffer->didFetchDataLoadFinished();
65 }
66 void didFetchDataLoadedStream() override
67 {
68 if (m_client)
69 m_client->didFetchDataLoadedStream();
70 m_buffer->didFetchDataLoadFinished();
71 }
72 void didFetchDataLoadFailed() override
73 {
74 if (m_client)
75 m_client->didFetchDataLoadFailed();
76 m_buffer->didFetchDataLoadFinished();
77 }
78
70 Member<BodyStreamBuffer> m_buffer; 79 Member<BodyStreamBuffer> m_buffer;
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client; 80 Member<FetchDataLoader::Client> m_client;
72 OwnPtr<BlobData> m_blobData;
73 }; 81 };
74 82
75 class StreamTeePump : public BodyStreamBuffer::Observer { 83 void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotific ationClient* client)
76 public:
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body StreamBuffer* outBuffer2)
78 : m_inBuffer(inBuffer)
79 , m_outBuffer1(outBuffer1)
80 , m_outBuffer2(outBuffer2)
81 {
82 }
83 void onWrite() override
84 {
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) {
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
87 m_outBuffer1->write(buf);
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
89 m_outBuffer2->write(buf);
90 }
91 }
92 void onClose() override
93 {
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
95 m_outBuffer1->close();
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
97 m_outBuffer2->close();
98 cleanup();
99 }
100 void onError() override
101 {
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
103 m_outBuffer1->error(m_inBuffer->exception());
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
105 m_outBuffer2->error(m_inBuffer->exception());
106 cleanup();
107 }
108 DEFINE_INLINE_VIRTUAL_TRACE()
109 {
110 BodyStreamBuffer::Observer::trace(visitor);
111 visitor->trace(m_inBuffer);
112 visitor->trace(m_outBuffer1);
113 visitor->trace(m_outBuffer2);
114 }
115 void start()
116 {
117 m_inBuffer->registerObserver(this);
118 onWrite();
119 if (m_inBuffer->hasError())
120 return onError();
121 if (m_inBuffer->isClosed())
122 return onClose();
123 }
124
125 private:
126 void cleanup()
127 {
128 m_inBuffer->unregisterObserver();
129 m_inBuffer.clear();
130 m_outBuffer1.clear();
131 m_outBuffer2.clear();
132 }
133 Member<BodyStreamBuffer> m_inBuffer;
134 Member<BodyStreamBuffer> m_outBuffer1;
135 Member<BodyStreamBuffer> m_outBuffer2;
136 };
137
138 // WebDataConsumerHandleAdapter is used to migrate incrementally
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed
140 // after the migration.
141 class WebDataConsumerHandleAdapter
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter>
143 , public WebDataConsumerHandle::Client {
144 public:
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
146 : m_reader(handle->obtainReader(this))
147 , m_failureMessage(failureMessage)
148 , m_outputBuffer(new BodyStreamBuffer(new Canceller(this)))
149 {
150 ASSERT(m_reader);
151 }
152
153 BodyStreamBuffer* outputBuffer() { return m_outputBuffer; }
154
155 DEFINE_INLINE_TRACE()
156 {
157 visitor->trace(m_outputBuffer);
158 }
159
160 private:
161 class Canceller : public BodyStreamBuffer::Canceller {
162 public:
163 explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(sour ce) { }
164
165 void cancel() override
166 {
167 m_source->close();
168 }
169
170 DEFINE_INLINE_VIRTUAL_TRACE()
171 {
172 BodyStreamBuffer::Canceller::trace(visitor);
173 visitor->trace(m_source);
174 }
175
176 private:
177 Member<WebDataConsumerHandleAdapter> m_source;
178 };
179
180 void didGetReadable() override
181 {
182 while (true) {
183 const void* buffer;
184 size_t available;
185 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
186 switch (result) {
187 case WebDataConsumerHandle::Ok:
188 m_outputBuffer->write(DOMArrayBuffer::create(buffer, available)) ;
189 m_reader->endRead(available);
190 break;
191
192 case WebDataConsumerHandle::Done:
193 close();
194 return;
195
196 case WebDataConsumerHandle::ShouldWait:
197 return;
198
199 case WebDataConsumerHandle::Busy:
200 case WebDataConsumerHandle::ResourceExhausted:
201 case WebDataConsumerHandle::UnexpectedError:
202 error();
203 return;
204 }
205 }
206 }
207
208 void error()
209 {
210 m_reader.clear();
211 m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessag e));
212 m_outputBuffer.clear();
213 }
214
215 void close()
216 {
217 m_reader.clear();
218 m_outputBuffer->close();
219 m_outputBuffer.clear();
220 }
221
222 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
223 String m_failureMessage;
224
225 Member<BodyStreamBuffer> m_outputBuffer;
226 };
227
228
229 } // namespace
230
231 PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read()
232 { 84 {
233 if (m_queue.isEmpty()) 85 ASSERT(!m_fetchDataLoader);
234 return PassRefPtr<DOMArrayBuffer>(); 86 ASSERT(!m_drainingStreamNotificationClient);
235 return m_queue.takeFirst(); 87 m_drainingStreamNotificationClient = client;
236 } 88 }
237 89
238 void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk) 90 void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataL oader::Client* client)
239 { 91 {
240 ASSERT(!m_isClosed); 92 ASSERT(!m_fetchDataLoader);
241 ASSERT(!m_exception); 93 m_fetchDataLoader = fetchDataLoader;
242 ASSERT(chunk); 94 m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(th is, client));
243 m_queue.append(chunk);
244 if (m_observer)
245 m_observer->onWrite();
246 } 95 }
247 96
248 void BodyStreamBuffer::close() 97 void BodyStreamBuffer::doDrainingStreamNotification()
249 { 98 {
250 ASSERT(!m_isClosed); 99 ASSERT(!m_fetchDataLoader);
251 ASSERT(!m_exception); 100 DrainingStreamNotificationClient* client = m_drainingStreamNotificationClien t;
252 m_isClosed = true; 101 m_drainingStreamNotificationClient.clear();
253 if (m_observer) 102 if (client)
254 m_observer->onClose(); 103 client->didFetchDataLoadFinishedFromDrainingStream();
255 } 104 }
256 105
257 void BodyStreamBuffer::error(DOMException* exception) 106 void BodyStreamBuffer::clearDrainingStreamNotification()
258 { 107 {
259 ASSERT(exception); 108 ASSERT(!m_fetchDataLoader);
260 ASSERT(!m_isClosed); 109 m_drainingStreamNotificationClient.clear();
261 ASSERT(!m_exception);
262 m_exception = exception;
263 if (m_observer)
264 m_observer->onError();
265 } 110 }
266 111
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo bHandleCreatorClient* client) 112 void BodyStreamBuffer::didFetchDataLoadFinished()
268 { 113 {
269 if (m_observer) 114 ASSERT(m_fetchDataLoader);
270 return false; 115 m_fetchDataLoader.clear();
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client); 116 doDrainingStreamNotification();
272 blobCreator->start();
273 return true;
274 } 117 }
275 118
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2) 119 DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer()
277 { 120 {
278 if (m_observer) 121 if (m_buffer)
279 return false; 122 m_buffer->doDrainingStreamNotification();
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2);
281 teePump->start();
282 return true;
283 } 123 }
284 124
285 bool BodyStreamBuffer::registerObserver(Observer* observer) 125 void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, Fe tchDataLoader::Client* client)
286 { 126 {
287 if (m_observer) 127 if (!m_buffer)
288 return false; 128 return;
289 ASSERT(observer); 129
290 m_observer = observer; 130 m_buffer->startLoading(fetchDataLoader, client);
291 return true; 131 m_buffer.clear();
292 } 132 }
293 133
294 void BodyStreamBuffer::unregisterObserver() 134 BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer()
295 { 135 {
296 m_observer.clear(); 136 if (!m_buffer)
137 return nullptr;
138
139 m_buffer->clearDrainingStreamNotification();
140 BodyStreamBuffer* buffer = m_buffer;
141 m_buffer.clear();
142 return buffer;
297 } 143 }
298 144
299 DEFINE_TRACE(BodyStreamBuffer) 145 PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(Fetch DataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy)
300 { 146 {
301 visitor->trace(m_exception); 147 if (!m_buffer)
302 visitor->trace(m_observer); 148 return nullptr;
303 visitor->trace(m_canceller); 149
150 RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nul lptr)->drainAsBlobDataHandle(blobSizePolicy);
151 if (!blobDataHandle)
152 return nullptr;
153 m_buffer->doDrainingStreamNotification();
154 m_buffer.clear();
155 return blobDataHandle.release();
304 } 156 }
305 157
306 BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller) 158 DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, Bod yStreamBuffer::DrainingStreamNotificationClient* client)
307 : m_isClosed(false) 159 : m_buffer(buffer)
308 , m_canceller(canceller)
309 { 160 {
310 } 161 if (client)
311 162 m_buffer->setDrainingStreamNotificationClient(client);
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han dle, const String& failureMessage)
313 {
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf fer();
315 } 163 }
316 164
317 } // namespace blink 165 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698