Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: Source/modules/fetch/BodyStreamBuffer.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/BodyStreamBuffer.h" 6 #include "modules/fetch/BodyStreamBuffer.h"
7 7
8 #include "core/dom/DOMArrayBuffer.h" 8 #include "core/dom/DOMArrayBuffer.h"
9 #include "core/dom/ExceptionCode.h" 9 #include "core/dom/ExceptionCode.h"
10 10
11 namespace blink { 11 namespace blink {
12 12
13 namespace { 13 namespace {
14 14
15 class BlobCreator final : public BodyStreamBuffer::Observer {
16 public:
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB uffer::BlobHandleCreatorClient* client)
18 : m_buffer(buffer)
19 , m_client(client)
20 , m_blobData(BlobData::create())
21 {
22 m_blobData->setContentType(contentType);
23 }
24 ~BlobCreator() override { }
25 DEFINE_INLINE_VIRTUAL_TRACE()
26 {
27 visitor->trace(m_buffer);
28 visitor->trace(m_client);
29 BodyStreamBuffer::Observer::trace(visitor);
30 }
31 void onWrite() override
32 {
33 ASSERT(m_buffer);
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) {
35 m_blobData->appendBytes(buf->data(), buf->byteLength());
36 }
37 }
38 void onClose() override
39 {
40 ASSERT(m_buffer);
41 const long long size = m_blobData->length();
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release( ), size));
43 cleanup();
44 }
45 void onError() override
46 {
47 ASSERT(m_buffer);
48 m_client->didFail(m_buffer->exception());
49 cleanup();
50 }
51 void start()
52 {
53 ASSERT(!m_buffer->isObserverRegistered());
54 m_buffer->registerObserver(this);
55 onWrite();
56 if (m_buffer->hasError()) {
57 return onError();
58 }
59 if (m_buffer->isClosed())
60 return onClose();
61 }
62 void cleanup()
63 {
64 m_buffer->unregisterObserver();
65 m_buffer.clear();
66 m_client.clear();
67 m_blobData.clear();
68 }
69 private:
70 Member<BodyStreamBuffer> m_buffer;
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client;
72 OwnPtr<BlobData> m_blobData;
73 };
74
75 class StreamTeePump : public BodyStreamBuffer::Observer {
76 public:
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body StreamBuffer* outBuffer2)
78 : m_inBuffer(inBuffer)
79 , m_outBuffer1(outBuffer1)
80 , m_outBuffer2(outBuffer2)
81 {
82 }
83 void onWrite() override
84 {
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) {
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
87 m_outBuffer1->write(buf);
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
89 m_outBuffer2->write(buf);
90 }
91 }
92 void onClose() override
93 {
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
95 m_outBuffer1->close();
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
97 m_outBuffer2->close();
98 cleanup();
99 }
100 void onError() override
101 {
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError())
103 m_outBuffer1->error(m_inBuffer->exception());
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError())
105 m_outBuffer2->error(m_inBuffer->exception());
106 cleanup();
107 }
108 DEFINE_INLINE_VIRTUAL_TRACE()
109 {
110 BodyStreamBuffer::Observer::trace(visitor);
111 visitor->trace(m_inBuffer);
112 visitor->trace(m_outBuffer1);
113 visitor->trace(m_outBuffer2);
114 }
115 void start()
116 {
117 m_inBuffer->registerObserver(this);
118 onWrite();
119 if (m_inBuffer->hasError())
120 return onError();
121 if (m_inBuffer->isClosed())
122 return onClose();
123 }
124
125 private:
126 void cleanup()
127 {
128 m_inBuffer->unregisterObserver();
129 m_inBuffer.clear();
130 m_outBuffer1.clear();
131 m_outBuffer2.clear();
132 }
133 Member<BodyStreamBuffer> m_inBuffer;
134 Member<BodyStreamBuffer> m_outBuffer1;
135 Member<BodyStreamBuffer> m_outBuffer2;
136 };
137
138 // WebDataConsumerHandleAdapter is used to migrate incrementally 15 // WebDataConsumerHandleAdapter is used to migrate incrementally
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed 16 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed
140 // after the migration. 17 // after the migration.
141 class WebDataConsumerHandleAdapter 18 class WebDataConsumerHandleAdapter
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter> 19 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter>
143 , public WebDataConsumerHandle::Client { 20 , public WebDataConsumerHandle::Client {
144 public: 21 public:
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage) 22 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage)
146 : m_reader(handle->obtainReader(this)) 23 : m_reader(handle->obtainReader(this))
147 , m_failureMessage(failureMessage) 24 , m_failureMessage(failureMessage)
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
257 void BodyStreamBuffer::error(DOMException* exception) 134 void BodyStreamBuffer::error(DOMException* exception)
258 { 135 {
259 ASSERT(exception); 136 ASSERT(exception);
260 ASSERT(!m_isClosed); 137 ASSERT(!m_isClosed);
261 ASSERT(!m_exception); 138 ASSERT(!m_exception);
262 m_exception = exception; 139 m_exception = exception;
263 if (m_observer) 140 if (m_observer)
264 m_observer->onError(); 141 m_observer->onError();
265 } 142 }
266 143
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo bHandleCreatorClient* client)
268 {
269 if (m_observer)
270 return false;
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client);
272 blobCreator->start();
273 return true;
274 }
275
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2)
277 {
278 if (m_observer)
279 return false;
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2);
281 teePump->start();
282 return true;
283 }
284
285 bool BodyStreamBuffer::registerObserver(Observer* observer) 144 bool BodyStreamBuffer::registerObserver(Observer* observer)
286 { 145 {
287 if (m_observer) 146 if (m_observer)
288 return false; 147 return false;
289 ASSERT(observer); 148 ASSERT(observer);
290 m_observer = observer; 149 m_observer = observer;
291 return true; 150 return true;
292 } 151 }
293 152
294 void BodyStreamBuffer::unregisterObserver() 153 void BodyStreamBuffer::unregisterObserver()
(...skipping 13 matching lines...) Expand all
308 , m_canceller(canceller) 167 , m_canceller(canceller)
309 { 168 {
310 } 169 }
311 170
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han dle, const String& failureMessage) 171 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han dle, const String& failureMessage)
313 { 172 {
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf fer(); 173 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf fer();
315 } 174 }
316 175
317 } // namespace blink 176 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698