OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "config.h" | 5 #include "config.h" |
6 #include "modules/fetch/BodyStreamBuffer.h" | 6 #include "modules/fetch/BodyStreamBuffer.h" |
7 | 7 |
8 #include "core/dom/DOMArrayBuffer.h" | 8 #include "core/dom/DOMArrayBuffer.h" |
9 #include "core/dom/ExceptionCode.h" | 9 #include "core/dom/ExceptionCode.h" |
10 | 10 |
11 namespace blink { | 11 namespace blink { |
12 | 12 |
13 namespace { | |
14 | |
15 class BlobCreator final : public BodyStreamBuffer::Observer { | |
16 public: | |
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB
uffer::BlobHandleCreatorClient* client) | |
18 : m_buffer(buffer) | |
19 , m_client(client) | |
20 , m_blobData(BlobData::create()) | |
21 { | |
22 m_blobData->setContentType(contentType); | |
23 } | |
24 ~BlobCreator() override { } | |
25 DEFINE_INLINE_VIRTUAL_TRACE() | |
26 { | |
27 visitor->trace(m_buffer); | |
28 visitor->trace(m_client); | |
29 BodyStreamBuffer::Observer::trace(visitor); | |
30 } | |
31 void onWrite() override | |
32 { | |
33 ASSERT(m_buffer); | |
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) { | |
35 m_blobData->appendBytes(buf->data(), buf->byteLength()); | |
36 } | |
37 } | |
38 void onClose() override | |
39 { | |
40 ASSERT(m_buffer); | |
41 const long long size = m_blobData->length(); | |
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release(
), size)); | |
43 cleanup(); | |
44 } | |
45 void onError() override | |
46 { | |
47 ASSERT(m_buffer); | |
48 m_client->didFail(m_buffer->exception()); | |
49 cleanup(); | |
50 } | |
51 void start() | |
52 { | |
53 ASSERT(!m_buffer->isObserverRegistered()); | |
54 m_buffer->registerObserver(this); | |
55 onWrite(); | |
56 if (m_buffer->hasError()) { | |
57 return onError(); | |
58 } | |
59 if (m_buffer->isClosed()) | |
60 return onClose(); | |
61 } | |
62 void cleanup() | |
63 { | |
64 m_buffer->unregisterObserver(); | |
65 m_buffer.clear(); | |
66 m_client.clear(); | |
67 m_blobData.clear(); | |
68 } | |
69 private: | |
70 Member<BodyStreamBuffer> m_buffer; | |
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client; | |
72 OwnPtr<BlobData> m_blobData; | |
73 }; | |
74 | |
75 class StreamTeePump : public BodyStreamBuffer::Observer { | |
76 public: | |
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body
StreamBuffer* outBuffer2) | |
78 : m_inBuffer(inBuffer) | |
79 , m_outBuffer1(outBuffer1) | |
80 , m_outBuffer2(outBuffer2) | |
81 { | |
82 } | |
83 void onWrite() override | |
84 { | |
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) { | |
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
87 m_outBuffer1->write(buf); | |
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
89 m_outBuffer2->write(buf); | |
90 } | |
91 } | |
92 void onClose() override | |
93 { | |
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
95 m_outBuffer1->close(); | |
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
97 m_outBuffer2->close(); | |
98 cleanup(); | |
99 } | |
100 void onError() override | |
101 { | |
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
103 m_outBuffer1->error(m_inBuffer->exception()); | |
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
105 m_outBuffer2->error(m_inBuffer->exception()); | |
106 cleanup(); | |
107 } | |
108 DEFINE_INLINE_VIRTUAL_TRACE() | |
109 { | |
110 BodyStreamBuffer::Observer::trace(visitor); | |
111 visitor->trace(m_inBuffer); | |
112 visitor->trace(m_outBuffer1); | |
113 visitor->trace(m_outBuffer2); | |
114 } | |
115 void start() | |
116 { | |
117 m_inBuffer->registerObserver(this); | |
118 onWrite(); | |
119 if (m_inBuffer->hasError()) | |
120 return onError(); | |
121 if (m_inBuffer->isClosed()) | |
122 return onClose(); | |
123 } | |
124 | |
125 private: | |
126 void cleanup() | |
127 { | |
128 m_inBuffer->unregisterObserver(); | |
129 m_inBuffer.clear(); | |
130 m_outBuffer1.clear(); | |
131 m_outBuffer2.clear(); | |
132 } | |
133 Member<BodyStreamBuffer> m_inBuffer; | |
134 Member<BodyStreamBuffer> m_outBuffer1; | |
135 Member<BodyStreamBuffer> m_outBuffer2; | |
136 }; | |
137 | |
138 // WebDataConsumerHandleAdapter is used to migrate incrementally | |
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed | |
140 // after the migration. | |
141 class WebDataConsumerHandleAdapter | |
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter> | |
143 , public WebDataConsumerHandle::Client { | |
144 public: | |
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const
String& failureMessage) | |
146 : m_reader(handle->obtainReader(this)) | |
147 , m_failureMessage(failureMessage) | |
148 , m_outputBuffer(new BodyStreamBuffer(new Canceller(this))) | |
149 { | |
150 ASSERT(m_reader); | |
151 } | |
152 | |
153 BodyStreamBuffer* outputBuffer() { return m_outputBuffer; } | |
154 | |
155 DEFINE_INLINE_TRACE() | |
156 { | |
157 visitor->trace(m_outputBuffer); | |
158 } | |
159 | |
160 private: | |
161 class Canceller : public BodyStreamBuffer::Canceller { | |
162 public: | |
163 explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(sour
ce) { } | |
164 | |
165 void cancel() override | |
166 { | |
167 m_source->close(); | |
168 } | |
169 | |
170 DEFINE_INLINE_VIRTUAL_TRACE() | |
171 { | |
172 BodyStreamBuffer::Canceller::trace(visitor); | |
173 visitor->trace(m_source); | |
174 } | |
175 | |
176 private: | |
177 Member<WebDataConsumerHandleAdapter> m_source; | |
178 }; | |
179 | |
180 void didGetReadable() override | |
181 { | |
182 while (true) { | |
183 const void* buffer; | |
184 size_t available; | |
185 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer,
WebDataConsumerHandle::FlagNone, &available); | |
186 switch (result) { | |
187 case WebDataConsumerHandle::Ok: | |
188 m_outputBuffer->write(DOMArrayBuffer::create(buffer, available))
; | |
189 m_reader->endRead(available); | |
190 break; | |
191 | |
192 case WebDataConsumerHandle::Done: | |
193 close(); | |
194 return; | |
195 | |
196 case WebDataConsumerHandle::ShouldWait: | |
197 return; | |
198 | |
199 case WebDataConsumerHandle::Busy: | |
200 case WebDataConsumerHandle::ResourceExhausted: | |
201 case WebDataConsumerHandle::UnexpectedError: | |
202 error(); | |
203 return; | |
204 } | |
205 } | |
206 } | |
207 | |
208 void error() | |
209 { | |
210 m_reader.clear(); | |
211 m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessag
e)); | |
212 m_outputBuffer.clear(); | |
213 } | |
214 | |
215 void close() | |
216 { | |
217 m_reader.clear(); | |
218 m_outputBuffer->close(); | |
219 m_outputBuffer.clear(); | |
220 } | |
221 | |
222 OwnPtr<WebDataConsumerHandle::Reader> m_reader; | |
223 String m_failureMessage; | |
224 | |
225 Member<BodyStreamBuffer> m_outputBuffer; | |
226 }; | |
227 | |
228 | |
229 } // namespace | |
230 | |
231 PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read() | |
232 { | |
233 if (m_queue.isEmpty()) | |
234 return PassRefPtr<DOMArrayBuffer>(); | |
235 return m_queue.takeFirst(); | |
236 } | |
237 | |
238 void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk) | |
239 { | |
240 ASSERT(!m_isClosed); | |
241 ASSERT(!m_exception); | |
242 ASSERT(chunk); | |
243 m_queue.append(chunk); | |
244 if (m_observer) | |
245 m_observer->onWrite(); | |
246 } | |
247 | |
248 void BodyStreamBuffer::close() | |
249 { | |
250 ASSERT(!m_isClosed); | |
251 ASSERT(!m_exception); | |
252 m_isClosed = true; | |
253 if (m_observer) | |
254 m_observer->onClose(); | |
255 } | |
256 | |
257 void BodyStreamBuffer::error(DOMException* exception) | |
258 { | |
259 ASSERT(exception); | |
260 ASSERT(!m_isClosed); | |
261 ASSERT(!m_exception); | |
262 m_exception = exception; | |
263 if (m_observer) | |
264 m_observer->onError(); | |
265 } | |
266 | |
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo
bHandleCreatorClient* client) | |
268 { | |
269 if (m_observer) | |
270 return false; | |
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client); | |
272 blobCreator->start(); | |
273 return true; | |
274 } | |
275 | |
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2) | |
277 { | |
278 if (m_observer) | |
279 return false; | |
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2); | |
281 teePump->start(); | |
282 return true; | |
283 } | |
284 | |
285 bool BodyStreamBuffer::registerObserver(Observer* observer) | |
286 { | |
287 if (m_observer) | |
288 return false; | |
289 ASSERT(observer); | |
290 m_observer = observer; | |
291 return true; | |
292 } | |
293 | |
294 void BodyStreamBuffer::unregisterObserver() | |
295 { | |
296 m_observer.clear(); | |
297 } | |
298 | |
299 DEFINE_TRACE(BodyStreamBuffer) | |
300 { | |
301 visitor->trace(m_exception); | |
302 visitor->trace(m_observer); | |
303 visitor->trace(m_canceller); | |
304 } | |
305 | |
306 BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller) | |
307 : m_isClosed(false) | |
308 , m_canceller(canceller) | |
309 { | |
310 } | |
311 | |
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han
dle, const String& failureMessage) | |
313 { | |
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf
fer(); | |
315 } | |
316 | |
317 } // namespace blink | 13 } // namespace blink |
OLD | NEW |