OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "config.h" | 5 #include "config.h" |
6 #include "modules/fetch/BodyStreamBuffer.h" | 6 #include "modules/fetch/BodyStreamBuffer.h" |
7 | 7 |
8 #include "core/dom/DOMArrayBuffer.h" | 8 #include "core/dom/DOMArrayBuffer.h" |
9 #include "core/dom/ExceptionCode.h" | 9 #include "core/dom/ExceptionCode.h" |
10 | 10 |
11 namespace blink { | 11 namespace blink { |
12 | 12 |
13 namespace { | 13 BodyStreamBuffer* BodyStreamBuffer::createEmpty() |
| 14 { |
| 15 return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(c
reateDoneDataConsumerHandle())); |
| 16 } |
14 | 17 |
15 class BlobCreator final : public BodyStreamBuffer::Observer { | 18 FetchDataConsumerHandle* BodyStreamBuffer::handle() const |
| 19 { |
| 20 ASSERT(!m_fetchDataLoader); |
| 21 ASSERT(!m_drainingStreamNotificationClient); |
| 22 return m_handle.get(); |
| 23 } |
| 24 |
| 25 PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() |
| 26 { |
| 27 ASSERT(!m_fetchDataLoader); |
| 28 ASSERT(!m_drainingStreamNotificationClient); |
| 29 return m_handle.release(); |
| 30 } |
| 31 |
| 32 class ClientWithFinishNotification final : public GarbageCollectedFinalized<Clie
ntWithFinishNotification>, public FetchDataLoader::Client { |
| 33 USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification); |
16 public: | 34 public: |
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB
uffer::BlobHandleCreatorClient* client) | 35 ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Clie
nt* client) |
18 : m_buffer(buffer) | 36 : m_buffer(buffer) |
19 , m_client(client) | 37 , m_client(client) |
20 , m_blobData(BlobData::create()) | |
21 { | 38 { |
22 m_blobData->setContentType(contentType); | |
23 } | 39 } |
24 ~BlobCreator() override { } | 40 |
25 DEFINE_INLINE_VIRTUAL_TRACE() | 41 DEFINE_INLINE_VIRTUAL_TRACE() |
26 { | 42 { |
27 visitor->trace(m_buffer); | 43 visitor->trace(m_buffer); |
28 visitor->trace(m_client); | 44 visitor->trace(m_client); |
29 BodyStreamBuffer::Observer::trace(visitor); | 45 FetchDataLoader::Client::trace(visitor); |
30 } | |
31 void onWrite() override | |
32 { | |
33 ASSERT(m_buffer); | |
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) { | |
35 m_blobData->appendBytes(buf->data(), buf->byteLength()); | |
36 } | |
37 } | |
38 void onClose() override | |
39 { | |
40 ASSERT(m_buffer); | |
41 const long long size = m_blobData->length(); | |
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release(
), size)); | |
43 cleanup(); | |
44 } | |
45 void onError() override | |
46 { | |
47 ASSERT(m_buffer); | |
48 m_client->didFail(m_buffer->exception()); | |
49 cleanup(); | |
50 } | |
51 void start() | |
52 { | |
53 ASSERT(!m_buffer->isObserverRegistered()); | |
54 m_buffer->registerObserver(this); | |
55 onWrite(); | |
56 if (m_buffer->hasError()) { | |
57 return onError(); | |
58 } | |
59 if (m_buffer->isClosed()) | |
60 return onClose(); | |
61 } | |
62 void cleanup() | |
63 { | |
64 m_buffer->unregisterObserver(); | |
65 m_buffer.clear(); | |
66 m_client.clear(); | |
67 m_blobData.clear(); | |
68 } | |
69 private: | |
70 Member<BodyStreamBuffer> m_buffer; | |
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client; | |
72 OwnPtr<BlobData> m_blobData; | |
73 }; | |
74 | |
75 class StreamTeePump : public BodyStreamBuffer::Observer { | |
76 public: | |
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body
StreamBuffer* outBuffer2) | |
78 : m_inBuffer(inBuffer) | |
79 , m_outBuffer1(outBuffer1) | |
80 , m_outBuffer2(outBuffer2) | |
81 { | |
82 } | |
83 void onWrite() override | |
84 { | |
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) { | |
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
87 m_outBuffer1->write(buf); | |
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
89 m_outBuffer2->write(buf); | |
90 } | |
91 } | |
92 void onClose() override | |
93 { | |
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
95 m_outBuffer1->close(); | |
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
97 m_outBuffer2->close(); | |
98 cleanup(); | |
99 } | |
100 void onError() override | |
101 { | |
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
103 m_outBuffer1->error(m_inBuffer->exception()); | |
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
105 m_outBuffer2->error(m_inBuffer->exception()); | |
106 cleanup(); | |
107 } | |
108 DEFINE_INLINE_VIRTUAL_TRACE() | |
109 { | |
110 BodyStreamBuffer::Observer::trace(visitor); | |
111 visitor->trace(m_inBuffer); | |
112 visitor->trace(m_outBuffer1); | |
113 visitor->trace(m_outBuffer2); | |
114 } | |
115 void start() | |
116 { | |
117 m_inBuffer->registerObserver(this); | |
118 onWrite(); | |
119 if (m_inBuffer->hasError()) | |
120 return onError(); | |
121 if (m_inBuffer->isClosed()) | |
122 return onClose(); | |
123 } | 46 } |
124 | 47 |
125 private: | 48 private: |
126 void cleanup() | 49 void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle)
override |
127 { | 50 { |
128 m_inBuffer->unregisterObserver(); | 51 if (m_client) |
129 m_inBuffer.clear(); | 52 m_client->didFetchDataLoadedBlobHandle(blobDataHandle); |
130 m_outBuffer1.clear(); | 53 m_buffer->didFetchDataLoadFinished(); |
131 m_outBuffer2.clear(); | |
132 } | 54 } |
133 Member<BodyStreamBuffer> m_inBuffer; | 55 void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) o
verride |
134 Member<BodyStreamBuffer> m_outBuffer1; | 56 { |
135 Member<BodyStreamBuffer> m_outBuffer2; | 57 if (m_client) |
| 58 m_client->didFetchDataLoadedArrayBuffer(arrayBuffer); |
| 59 m_buffer->didFetchDataLoadFinished(); |
| 60 } |
| 61 void didFetchDataLoadedString(const String& str) override |
| 62 { |
| 63 if (m_client) |
| 64 m_client->didFetchDataLoadedString(str); |
| 65 m_buffer->didFetchDataLoadFinished(); |
| 66 } |
| 67 void didFetchDataLoadedStream() override |
| 68 { |
| 69 if (m_client) |
| 70 m_client->didFetchDataLoadedStream(); |
| 71 m_buffer->didFetchDataLoadFinished(); |
| 72 } |
| 73 void didFetchDataLoadFailed() override |
| 74 { |
| 75 if (m_client) |
| 76 m_client->didFetchDataLoadFailed(); |
| 77 m_buffer->didFetchDataLoadFinished(); |
| 78 } |
| 79 |
| 80 Member<BodyStreamBuffer> m_buffer; |
| 81 Member<FetchDataLoader::Client> m_client; |
136 }; | 82 }; |
137 | 83 |
138 // WebDataConsumerHandleAdapter is used to migrate incrementally | 84 void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotific
ationClient* client) |
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed | |
140 // after the migration. | |
141 class WebDataConsumerHandleAdapter | |
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter> | |
143 , public WebDataConsumerHandle::Client { | |
144 public: | |
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const
String& failureMessage) | |
146 : m_reader(handle->obtainReader(this)) | |
147 , m_failureMessage(failureMessage) | |
148 , m_outputBuffer(new BodyStreamBuffer(new Canceller(this))) | |
149 { | |
150 ASSERT(m_reader); | |
151 } | |
152 | |
153 BodyStreamBuffer* outputBuffer() { return m_outputBuffer; } | |
154 | |
155 DEFINE_INLINE_TRACE() | |
156 { | |
157 visitor->trace(m_outputBuffer); | |
158 } | |
159 | |
160 private: | |
161 class Canceller : public BodyStreamBuffer::Canceller { | |
162 public: | |
163 explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(sour
ce) { } | |
164 | |
165 void cancel() override | |
166 { | |
167 m_source->close(); | |
168 } | |
169 | |
170 DEFINE_INLINE_VIRTUAL_TRACE() | |
171 { | |
172 BodyStreamBuffer::Canceller::trace(visitor); | |
173 visitor->trace(m_source); | |
174 } | |
175 | |
176 private: | |
177 Member<WebDataConsumerHandleAdapter> m_source; | |
178 }; | |
179 | |
180 void didGetReadable() override | |
181 { | |
182 while (true) { | |
183 const void* buffer; | |
184 size_t available; | |
185 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer,
WebDataConsumerHandle::FlagNone, &available); | |
186 switch (result) { | |
187 case WebDataConsumerHandle::Ok: | |
188 m_outputBuffer->write(DOMArrayBuffer::create(buffer, available))
; | |
189 m_reader->endRead(available); | |
190 break; | |
191 | |
192 case WebDataConsumerHandle::Done: | |
193 close(); | |
194 return; | |
195 | |
196 case WebDataConsumerHandle::ShouldWait: | |
197 return; | |
198 | |
199 case WebDataConsumerHandle::Busy: | |
200 case WebDataConsumerHandle::ResourceExhausted: | |
201 case WebDataConsumerHandle::UnexpectedError: | |
202 error(); | |
203 return; | |
204 } | |
205 } | |
206 } | |
207 | |
208 void error() | |
209 { | |
210 m_reader.clear(); | |
211 m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessag
e)); | |
212 m_outputBuffer.clear(); | |
213 } | |
214 | |
215 void close() | |
216 { | |
217 m_reader.clear(); | |
218 m_outputBuffer->close(); | |
219 m_outputBuffer.clear(); | |
220 } | |
221 | |
222 OwnPtr<WebDataConsumerHandle::Reader> m_reader; | |
223 String m_failureMessage; | |
224 | |
225 Member<BodyStreamBuffer> m_outputBuffer; | |
226 }; | |
227 | |
228 | |
229 } // namespace | |
230 | |
231 PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read() | |
232 { | 85 { |
233 if (m_queue.isEmpty()) | 86 ASSERT(!m_fetchDataLoader); |
234 return PassRefPtr<DOMArrayBuffer>(); | 87 ASSERT(!m_drainingStreamNotificationClient); |
235 return m_queue.takeFirst(); | 88 m_drainingStreamNotificationClient = client; |
236 } | 89 } |
237 | 90 |
238 void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk) | 91 void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataL
oader::Client* client) |
239 { | 92 { |
240 ASSERT(!m_isClosed); | 93 ASSERT(!m_fetchDataLoader); |
241 ASSERT(!m_exception); | 94 m_fetchDataLoader = fetchDataLoader; |
242 ASSERT(chunk); | 95 m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(th
is, client)); |
243 m_queue.append(chunk); | |
244 if (m_observer) | |
245 m_observer->onWrite(); | |
246 } | 96 } |
247 | 97 |
248 void BodyStreamBuffer::close() | 98 void BodyStreamBuffer::doDrainingStreamNotification() |
249 { | 99 { |
250 ASSERT(!m_isClosed); | 100 ASSERT(!m_fetchDataLoader); |
251 ASSERT(!m_exception); | 101 DrainingStreamNotificationClient* client = m_drainingStreamNotificationClien
t; |
252 m_isClosed = true; | 102 m_drainingStreamNotificationClient.clear(); |
253 if (m_observer) | 103 if (client) |
254 m_observer->onClose(); | 104 client->didFetchDataLoadFinishedFromDrainingStream(); |
255 } | 105 } |
256 | 106 |
257 void BodyStreamBuffer::error(DOMException* exception) | 107 void BodyStreamBuffer::clearDrainingStreamNotification() |
258 { | 108 { |
259 ASSERT(exception); | 109 ASSERT(!m_fetchDataLoader); |
260 ASSERT(!m_isClosed); | 110 m_drainingStreamNotificationClient.clear(); |
261 ASSERT(!m_exception); | |
262 m_exception = exception; | |
263 if (m_observer) | |
264 m_observer->onError(); | |
265 } | 111 } |
266 | 112 |
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo
bHandleCreatorClient* client) | 113 void BodyStreamBuffer::didFetchDataLoadFinished() |
268 { | 114 { |
269 if (m_observer) | 115 ASSERT(m_fetchDataLoader); |
270 return false; | 116 m_fetchDataLoader.clear(); |
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client); | 117 doDrainingStreamNotification(); |
272 blobCreator->start(); | |
273 return true; | |
274 } | 118 } |
275 | 119 |
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2) | 120 DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer() |
277 { | 121 { |
278 if (m_observer) | 122 if (m_buffer) |
279 return false; | 123 m_buffer->doDrainingStreamNotification(); |
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2); | |
281 teePump->start(); | |
282 return true; | |
283 } | 124 } |
284 | 125 |
285 bool BodyStreamBuffer::registerObserver(Observer* observer) | 126 void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, Fe
tchDataLoader::Client* client) |
286 { | 127 { |
287 if (m_observer) | 128 if (!m_buffer) |
288 return false; | 129 return; |
289 ASSERT(observer); | 130 |
290 m_observer = observer; | 131 m_buffer->startLoading(fetchDataLoader, client); |
291 return true; | 132 m_buffer.clear(); |
292 } | 133 } |
293 | 134 |
294 void BodyStreamBuffer::unregisterObserver() | 135 BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer() |
295 { | 136 { |
296 m_observer.clear(); | 137 if (!m_buffer) |
| 138 return nullptr; |
| 139 |
| 140 m_buffer->clearDrainingStreamNotification(); |
| 141 BodyStreamBuffer* buffer = m_buffer; |
| 142 m_buffer.clear(); |
| 143 return buffer; |
297 } | 144 } |
298 | 145 |
299 DEFINE_TRACE(BodyStreamBuffer) | 146 PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(Fetch
DataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy) |
300 { | 147 { |
301 visitor->trace(m_exception); | 148 if (!m_buffer) |
302 visitor->trace(m_observer); | 149 return nullptr; |
303 visitor->trace(m_canceller); | 150 |
| 151 RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nul
lptr)->drainAsBlobDataHandle(blobSizePolicy); |
| 152 if (!blobDataHandle) |
| 153 return nullptr; |
| 154 m_buffer->doDrainingStreamNotification(); |
| 155 m_buffer.clear(); |
| 156 return blobDataHandle.release(); |
304 } | 157 } |
305 | 158 |
306 BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller) | 159 DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, Bod
yStreamBuffer::DrainingStreamNotificationClient* client) |
307 : m_isClosed(false) | 160 : m_buffer(buffer) |
308 , m_canceller(canceller) | |
309 { | 161 { |
310 } | 162 if (client) |
311 | 163 m_buffer->setDrainingStreamNotificationClient(client); |
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han
dle, const String& failureMessage) | |
313 { | |
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf
fer(); | |
315 } | 164 } |
316 | 165 |
317 } // namespace blink | 166 } // namespace blink |
OLD | NEW |