OLD | NEW |
---|---|
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "config.h" | 5 #include "config.h" |
6 #include "modules/fetch/BodyStreamBuffer.h" | 6 #include "modules/fetch/BodyStreamBuffer.h" |
7 | 7 |
8 #include "core/dom/DOMArrayBuffer.h" | 8 #include "core/dom/DOMArrayBuffer.h" |
9 #include "core/dom/ExceptionCode.h" | 9 #include "core/dom/ExceptionCode.h" |
10 | 10 |
11 namespace blink { | 11 namespace blink { |
12 | 12 |
13 namespace { | 13 BodyStreamBuffer* BodyStreamBuffer::createEmpty() |
14 { | |
15 return BodyStreamBuffer::create(createFetchDataConsumerHandleFromWebHandle(c reateDoneDataConsumerHandle())); | |
16 } | |
14 | 17 |
15 class BlobCreator final : public BodyStreamBuffer::Observer { | 18 FetchDataConsumerHandle* BodyStreamBuffer::handle() const |
19 { | |
20 ASSERT(!m_fetchDataLoader); | |
21 ASSERT(!m_drainingStreamNotificationClient); | |
22 return m_handle.get(); | |
23 } | |
24 | |
25 PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle() | |
26 { | |
27 ASSERT(!m_fetchDataLoader); | |
28 ASSERT(!m_drainingStreamNotificationClient); | |
29 return m_handle.release(); | |
30 } | |
31 | |
32 class ClientWithFinishNotification final : public GarbageCollectedFinalized<Clie ntWithFinishNotification>, public FetchDataLoader::Client { | |
33 USING_GARBAGE_COLLECTED_MIXIN(ClientWithFinishNotification); | |
16 public: | 34 public: |
17 BlobCreator(BodyStreamBuffer* buffer, const String& contentType, BodyStreamB uffer::BlobHandleCreatorClient* client) | 35 ClientWithFinishNotification(BodyStreamBuffer* buffer, FetchDataLoader::Clie nt* client) |
18 : m_buffer(buffer) | 36 : m_buffer(buffer) |
19 , m_client(client) | 37 , m_client(client) |
20 , m_blobData(BlobData::create()) | |
21 { | 38 { |
22 m_blobData->setContentType(contentType); | |
23 } | 39 } |
24 ~BlobCreator() override { } | 40 |
25 DEFINE_INLINE_VIRTUAL_TRACE() | 41 DEFINE_INLINE_VIRTUAL_TRACE() |
26 { | 42 { |
27 visitor->trace(m_buffer); | 43 visitor->trace(m_buffer); |
28 visitor->trace(m_client); | 44 visitor->trace(m_client); |
29 BodyStreamBuffer::Observer::trace(visitor); | 45 FetchDataLoader::Client::trace(visitor); |
30 } | |
31 void onWrite() override | |
32 { | |
33 ASSERT(m_buffer); | |
34 while (RefPtr<DOMArrayBuffer> buf = m_buffer->read()) { | |
35 m_blobData->appendBytes(buf->data(), buf->byteLength()); | |
36 } | |
37 } | |
38 void onClose() override | |
39 { | |
40 ASSERT(m_buffer); | |
41 const long long size = m_blobData->length(); | |
42 m_client->didCreateBlobHandle(BlobDataHandle::create(m_blobData.release( ), size)); | |
43 cleanup(); | |
44 } | |
45 void onError() override | |
46 { | |
47 ASSERT(m_buffer); | |
48 m_client->didFail(m_buffer->exception()); | |
49 cleanup(); | |
50 } | |
51 void start() | |
52 { | |
53 ASSERT(!m_buffer->isObserverRegistered()); | |
54 m_buffer->registerObserver(this); | |
55 onWrite(); | |
56 if (m_buffer->hasError()) { | |
57 return onError(); | |
58 } | |
59 if (m_buffer->isClosed()) | |
60 return onClose(); | |
61 } | |
62 void cleanup() | |
63 { | |
64 m_buffer->unregisterObserver(); | |
65 m_buffer.clear(); | |
66 m_client.clear(); | |
67 m_blobData.clear(); | |
68 } | 46 } |
yhirano
2015/07/06 03:25:39
+empty line
hiroshige
2015/07/06 05:47:07
Done.
| |
69 private: | 47 private: |
48 void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override | |
49 { | |
50 if (m_client) | |
51 m_client->didFetchDataLoadedBlobHandle(blobDataHandle); | |
52 m_buffer->didFetchDataLoadFinished(); | |
53 } | |
54 void didFetchDataLoadedArrayBuffer(PassRefPtr<DOMArrayBuffer> arrayBuffer) o verride | |
55 { | |
56 if (m_client) | |
57 m_client->didFetchDataLoadedArrayBuffer(arrayBuffer); | |
58 m_buffer->didFetchDataLoadFinished(); | |
59 } | |
60 void didFetchDataLoadedString(const String& str) override | |
61 { | |
62 if (m_client) | |
63 m_client->didFetchDataLoadedString(str); | |
64 m_buffer->didFetchDataLoadFinished(); | |
65 } | |
66 void didFetchDataLoadedStream() override | |
67 { | |
68 if (m_client) | |
69 m_client->didFetchDataLoadedStream(); | |
70 m_buffer->didFetchDataLoadFinished(); | |
71 } | |
72 void didFetchDataLoadFailed() override | |
73 { | |
74 if (m_client) | |
75 m_client->didFetchDataLoadFailed(); | |
76 m_buffer->didFetchDataLoadFinished(); | |
77 } | |
78 | |
70 Member<BodyStreamBuffer> m_buffer; | 79 Member<BodyStreamBuffer> m_buffer; |
71 Member<BodyStreamBuffer::BlobHandleCreatorClient> m_client; | 80 Member<FetchDataLoader::Client> m_client; |
72 OwnPtr<BlobData> m_blobData; | |
73 }; | 81 }; |
74 | 82 |
75 class StreamTeePump : public BodyStreamBuffer::Observer { | 83 void BodyStreamBuffer::setDrainingStreamNotificationClient(DrainingStreamNotific ationClient* client) |
76 public: | |
77 StreamTeePump(BodyStreamBuffer* inBuffer, BodyStreamBuffer* outBuffer1, Body StreamBuffer* outBuffer2) | |
78 : m_inBuffer(inBuffer) | |
79 , m_outBuffer1(outBuffer1) | |
80 , m_outBuffer2(outBuffer2) | |
81 { | |
82 } | |
83 void onWrite() override | |
84 { | |
85 while (RefPtr<DOMArrayBuffer> buf = m_inBuffer->read()) { | |
86 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
87 m_outBuffer1->write(buf); | |
88 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
89 m_outBuffer2->write(buf); | |
90 } | |
91 } | |
92 void onClose() override | |
93 { | |
94 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
95 m_outBuffer1->close(); | |
96 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
97 m_outBuffer2->close(); | |
98 cleanup(); | |
99 } | |
100 void onError() override | |
101 { | |
102 if (!m_outBuffer1->isClosed() && !m_outBuffer1->hasError()) | |
103 m_outBuffer1->error(m_inBuffer->exception()); | |
104 if (!m_outBuffer2->isClosed() && !m_outBuffer2->hasError()) | |
105 m_outBuffer2->error(m_inBuffer->exception()); | |
106 cleanup(); | |
107 } | |
108 DEFINE_INLINE_VIRTUAL_TRACE() | |
109 { | |
110 BodyStreamBuffer::Observer::trace(visitor); | |
111 visitor->trace(m_inBuffer); | |
112 visitor->trace(m_outBuffer1); | |
113 visitor->trace(m_outBuffer2); | |
114 } | |
115 void start() | |
116 { | |
117 m_inBuffer->registerObserver(this); | |
118 onWrite(); | |
119 if (m_inBuffer->hasError()) | |
120 return onError(); | |
121 if (m_inBuffer->isClosed()) | |
122 return onClose(); | |
123 } | |
124 | |
125 private: | |
126 void cleanup() | |
127 { | |
128 m_inBuffer->unregisterObserver(); | |
129 m_inBuffer.clear(); | |
130 m_outBuffer1.clear(); | |
131 m_outBuffer2.clear(); | |
132 } | |
133 Member<BodyStreamBuffer> m_inBuffer; | |
134 Member<BodyStreamBuffer> m_outBuffer1; | |
135 Member<BodyStreamBuffer> m_outBuffer2; | |
136 }; | |
137 | |
138 // WebDataConsumerHandleAdapter is used to migrate incrementally | |
139 // from BodyStreamBuffer to FetchDataConsumerHandle and will be removed | |
140 // after the migration. | |
141 class WebDataConsumerHandleAdapter | |
142 : public GarbageCollectedFinalized<WebDataConsumerHandleAdapter> | |
143 , public WebDataConsumerHandle::Client { | |
144 public: | |
145 WebDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle, const String& failureMessage) | |
146 : m_reader(handle->obtainReader(this)) | |
147 , m_failureMessage(failureMessage) | |
148 , m_outputBuffer(new BodyStreamBuffer(new Canceller(this))) | |
149 { | |
150 ASSERT(m_reader); | |
151 } | |
152 | |
153 BodyStreamBuffer* outputBuffer() { return m_outputBuffer; } | |
154 | |
155 DEFINE_INLINE_TRACE() | |
156 { | |
157 visitor->trace(m_outputBuffer); | |
158 } | |
159 | |
160 private: | |
161 class Canceller : public BodyStreamBuffer::Canceller { | |
162 public: | |
163 explicit Canceller(WebDataConsumerHandleAdapter* source) : m_source(sour ce) { } | |
164 | |
165 void cancel() override | |
166 { | |
167 m_source->close(); | |
168 } | |
169 | |
170 DEFINE_INLINE_VIRTUAL_TRACE() | |
171 { | |
172 BodyStreamBuffer::Canceller::trace(visitor); | |
173 visitor->trace(m_source); | |
174 } | |
175 | |
176 private: | |
177 Member<WebDataConsumerHandleAdapter> m_source; | |
178 }; | |
179 | |
180 void didGetReadable() override | |
181 { | |
182 while (true) { | |
183 const void* buffer; | |
184 size_t available; | |
185 WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available); | |
186 switch (result) { | |
187 case WebDataConsumerHandle::Ok: | |
188 m_outputBuffer->write(DOMArrayBuffer::create(buffer, available)) ; | |
189 m_reader->endRead(available); | |
190 break; | |
191 | |
192 case WebDataConsumerHandle::Done: | |
193 close(); | |
194 return; | |
195 | |
196 case WebDataConsumerHandle::ShouldWait: | |
197 return; | |
198 | |
199 case WebDataConsumerHandle::Busy: | |
200 case WebDataConsumerHandle::ResourceExhausted: | |
201 case WebDataConsumerHandle::UnexpectedError: | |
202 error(); | |
203 return; | |
204 } | |
205 } | |
206 } | |
207 | |
208 void error() | |
209 { | |
210 m_reader.clear(); | |
211 m_outputBuffer->error(DOMException::create(NetworkError, m_failureMessag e)); | |
212 m_outputBuffer.clear(); | |
213 } | |
214 | |
215 void close() | |
216 { | |
217 m_reader.clear(); | |
218 m_outputBuffer->close(); | |
219 m_outputBuffer.clear(); | |
220 } | |
221 | |
222 OwnPtr<WebDataConsumerHandle::Reader> m_reader; | |
223 String m_failureMessage; | |
224 | |
225 Member<BodyStreamBuffer> m_outputBuffer; | |
226 }; | |
227 | |
228 | |
229 } // namespace | |
230 | |
231 PassRefPtr<DOMArrayBuffer> BodyStreamBuffer::read() | |
232 { | 84 { |
233 if (m_queue.isEmpty()) | 85 ASSERT(!m_fetchDataLoader); |
234 return PassRefPtr<DOMArrayBuffer>(); | 86 ASSERT(!m_drainingStreamNotificationClient); |
235 return m_queue.takeFirst(); | 87 m_drainingStreamNotificationClient = client; |
236 } | 88 } |
237 | 89 |
238 void BodyStreamBuffer::write(PassRefPtr<DOMArrayBuffer> chunk) | 90 void BodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, FetchDataL oader::Client* client) |
239 { | 91 { |
240 ASSERT(!m_isClosed); | 92 ASSERT(!m_fetchDataLoader); |
241 ASSERT(!m_exception); | 93 m_fetchDataLoader = fetchDataLoader; |
242 ASSERT(chunk); | 94 m_fetchDataLoader->start(m_handle.get(), new ClientWithFinishNotification(th is, client)); |
243 m_queue.append(chunk); | |
244 if (m_observer) | |
245 m_observer->onWrite(); | |
246 } | 95 } |
247 | 96 |
248 void BodyStreamBuffer::close() | 97 void BodyStreamBuffer::doDrainingStreamNotification() |
249 { | 98 { |
250 ASSERT(!m_isClosed); | 99 ASSERT(!m_fetchDataLoader); |
251 ASSERT(!m_exception); | 100 DrainingStreamNotificationClient* client = m_drainingStreamNotificationClien t; |
252 m_isClosed = true; | 101 m_drainingStreamNotificationClient.clear(); |
253 if (m_observer) | 102 if (client) |
254 m_observer->onClose(); | 103 client->didFetchDataLoadFinishedFromDrainingStream(); |
255 } | 104 } |
256 | 105 |
257 void BodyStreamBuffer::error(DOMException* exception) | 106 void BodyStreamBuffer::clearDrainingStreamNotification() |
258 { | 107 { |
259 ASSERT(exception); | 108 ASSERT(!m_fetchDataLoader); |
260 ASSERT(!m_isClosed); | 109 m_drainingStreamNotificationClient.clear(); |
261 ASSERT(!m_exception); | |
262 m_exception = exception; | |
263 if (m_observer) | |
264 m_observer->onError(); | |
265 } | 110 } |
266 | 111 |
267 bool BodyStreamBuffer::readAllAndCreateBlobHandle(const String& contentType, Blo bHandleCreatorClient* client) | 112 void BodyStreamBuffer::didFetchDataLoadFinished() |
268 { | 113 { |
269 if (m_observer) | 114 ASSERT(m_fetchDataLoader); |
270 return false; | 115 m_fetchDataLoader.clear(); |
271 BlobCreator* blobCreator = new BlobCreator(this, contentType, client); | 116 doDrainingStreamNotification(); |
272 blobCreator->start(); | |
273 return true; | |
274 } | 117 } |
275 | 118 |
276 bool BodyStreamBuffer::startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2) | 119 DrainingBodyStreamBuffer::~DrainingBodyStreamBuffer() |
277 { | 120 { |
278 if (m_observer) | 121 if (m_buffer) |
279 return false; | 122 m_buffer->doDrainingStreamNotification(); |
280 StreamTeePump* teePump = new StreamTeePump(this, out1, out2); | |
281 teePump->start(); | |
282 return true; | |
283 } | 123 } |
284 | 124 |
285 bool BodyStreamBuffer::registerObserver(Observer* observer) | 125 void DrainingBodyStreamBuffer::startLoading(FetchDataLoader* fetchDataLoader, Fe tchDataLoader::Client* client) |
286 { | 126 { |
287 if (m_observer) | 127 if (!m_buffer) |
288 return false; | 128 return; |
289 ASSERT(observer); | 129 |
290 m_observer = observer; | 130 m_buffer->startLoading(fetchDataLoader, client); |
291 return true; | 131 m_buffer.clear(); |
292 } | 132 } |
293 | 133 |
294 void BodyStreamBuffer::unregisterObserver() | 134 BodyStreamBuffer* DrainingBodyStreamBuffer::leakBuffer() |
295 { | 135 { |
296 m_observer.clear(); | 136 if (!m_buffer) |
137 return nullptr; | |
138 | |
139 m_buffer->clearDrainingStreamNotification(); | |
140 BodyStreamBuffer* buffer = m_buffer; | |
141 m_buffer.clear(); | |
142 return buffer; | |
297 } | 143 } |
298 | 144 |
299 DEFINE_TRACE(BodyStreamBuffer) | 145 PassRefPtr<BlobDataHandle> DrainingBodyStreamBuffer::drainAsBlobDataHandle(Fetch DataConsumerHandle::Reader::BlobSizePolicy blobSizePolicy) |
300 { | 146 { |
301 visitor->trace(m_exception); | 147 if (!m_buffer) |
302 visitor->trace(m_observer); | 148 return nullptr; |
303 visitor->trace(m_canceller); | 149 |
150 RefPtr<BlobDataHandle> blobDataHandle = m_buffer->m_handle->obtainReader(nul lptr)->drainAsBlobDataHandle(blobSizePolicy); | |
151 if (!blobDataHandle) | |
152 return nullptr; | |
153 m_buffer->doDrainingStreamNotification(); | |
154 m_buffer.clear(); | |
155 return blobDataHandle.release(); | |
304 } | 156 } |
305 | 157 |
306 BodyStreamBuffer::BodyStreamBuffer(Canceller* canceller) | 158 DrainingBodyStreamBuffer::DrainingBodyStreamBuffer(BodyStreamBuffer* buffer, Bod yStreamBuffer::DrainingStreamNotificationClient* client) |
307 : m_isClosed(false) | 159 : m_buffer(buffer) |
308 , m_canceller(canceller) | |
309 { | 160 { |
310 } | 161 if (client) |
311 | 162 m_buffer->setDrainingStreamNotificationClient(client); |
312 BodyStreamBuffer* BodyStreamBuffer::create(PassOwnPtr<WebDataConsumerHandle> han dle, const String& failureMessage) | |
313 { | |
314 return (new WebDataConsumerHandleAdapter(handle, failureMessage))->outputBuf fer(); | |
315 } | 163 } |
316 | 164 |
317 } // namespace blink | 165 } // namespace blink |
OLD | NEW |