OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "modules/fetch/DataConsumerTee.h" | 5 #include "modules/fetch/DataConsumerTee.h" |
6 | 6 |
7 #include "core/dom/ActiveDOMObject.h" | 7 #include "core/dom/ActiveDOMObject.h" |
8 #include "core/dom/ExecutionContext.h" | 8 #include "core/dom/ExecutionContext.h" |
9 #include "modules/fetch/DataConsumerHandleUtil.h" | 9 #include "modules/fetch/DataConsumerHandleUtil.h" |
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" | 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" |
11 #include "platform/ThreadSafeFunctional.h" | 11 #include "platform/ThreadSafeFunctional.h" |
12 #include "platform/heap/Handle.h" | 12 #include "platform/heap/Handle.h" |
13 #include "public/platform/Platform.h" | 13 #include "public/platform/Platform.h" |
14 #include "public/platform/WebTaskRunner.h" | 14 #include "public/platform/WebTaskRunner.h" |
15 #include "public/platform/WebThread.h" | 15 #include "public/platform/WebThread.h" |
16 #include "public/platform/WebTraceLocation.h" | 16 #include "public/platform/WebTraceLocation.h" |
17 #include "wtf/Deque.h" | 17 #include "wtf/Deque.h" |
18 #include "wtf/Functional.h" | 18 #include "wtf/Functional.h" |
19 #include "wtf/PtrUtil.h" | |
20 #include "wtf/ThreadSafeRefCounted.h" | 19 #include "wtf/ThreadSafeRefCounted.h" |
21 #include "wtf/ThreadingPrimitives.h" | 20 #include "wtf/ThreadingPrimitives.h" |
22 #include "wtf/Vector.h" | 21 #include "wtf/Vector.h" |
23 #include <memory> | |
24 | 22 |
25 namespace blink { | 23 namespace blink { |
26 | 24 |
27 using Result = WebDataConsumerHandle::Result; | 25 using Result = WebDataConsumerHandle::Result; |
28 using Flags = WebDataConsumerHandle::Flags; | 26 using Flags = WebDataConsumerHandle::Flags; |
29 | 27 |
30 namespace { | 28 namespace { |
31 | 29 |
32 // This file contains the "tee" implementation. There are several classes and | 30 // This file contains the "tee" implementation. There are several classes and |
33 // their relationship is complicated, so let me describe here. | 31 // their relationship is complicated, so let me describe here. |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 }; | 141 }; |
144 | 142 |
145 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina
tionContext()); } | 143 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina
tionContext()); } |
146 | 144 |
147 void enqueue(const char* buffer, size_t size) | 145 void enqueue(const char* buffer, size_t size) |
148 { | 146 { |
149 bool needsNotification = false; | 147 bool needsNotification = false; |
150 { | 148 { |
151 MutexLocker locker(m_mutex); | 149 MutexLocker locker(m_mutex); |
152 needsNotification = m_queue.isEmpty(); | 150 needsNotification = m_queue.isEmpty(); |
153 std::unique_ptr<Vector<char>> data = wrapUnique(new Vector<char>); | 151 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>); |
154 data->append(buffer, size); | 152 data->append(buffer, size); |
155 m_queue.append(std::move(data)); | 153 m_queue.append(std::move(data)); |
156 } | 154 } |
157 if (needsNotification) | 155 if (needsNotification) |
158 notify(); | 156 notify(); |
159 } | 157 } |
160 | 158 |
161 void setResult(Result r) | 159 void setResult(Result r) |
162 { | 160 { |
163 ASSERT(r != WebDataConsumerHandle::Ok); | 161 ASSERT(r != WebDataConsumerHandle::Ok); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
204 ASSERT(!m_client); | 202 ASSERT(!m_client); |
205 m_readerThread = Platform::current()->currentThread(); | 203 m_readerThread = Platform::current()->currentThread(); |
206 m_client = client; | 204 m_client = client; |
207 } | 205 } |
208 void detachReader() | 206 void detachReader() |
209 { | 207 { |
210 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); | 208 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
211 m_readerThread = nullptr; | 209 m_readerThread = nullptr; |
212 m_client = nullptr; | 210 m_client = nullptr; |
213 } | 211 } |
214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } | 212 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); } |
215 bool isEmpty() const { return m_queue.isEmpty(); } | 213 bool isEmpty() const { return m_queue.isEmpty(); } |
216 size_t offset() const { return m_offset; } | 214 size_t offset() const { return m_offset; } |
217 void consume(size_t size) | 215 void consume(size_t size) |
218 { | 216 { |
219 const auto& top = m_queue.first(); | 217 const auto& top = m_queue.first(); |
220 ASSERT(m_offset <= m_offset + size); | 218 ASSERT(m_offset <= m_offset + size); |
221 ASSERT(m_offset + size <= top->size()); | 219 ASSERT(m_offset + size <= top->size()); |
222 if (top->size() <= m_offset + size) { | 220 if (top->size() <= m_offset + size) { |
223 m_offset = 0; | 221 m_offset = 0; |
224 m_queue.removeFirst(); | 222 m_queue.removeFirst(); |
(...skipping 15 matching lines...) Expand all Loading... |
240 | 238 |
241 void detach() | 239 void detach() |
242 { | 240 { |
243 MutexLocker locker(m_mutex); | 241 MutexLocker locker(m_mutex); |
244 ASSERT(!m_client); | 242 ASSERT(!m_client); |
245 ASSERT(!m_readerThread); | 243 ASSERT(!m_readerThread); |
246 m_queue.clear(); | 244 m_queue.clear(); |
247 } | 245 } |
248 | 246 |
249 Result m_result; | 247 Result m_result; |
250 Deque<std::unique_ptr<Vector<char>>> m_queue; | 248 Deque<OwnPtr<Vector<char>>> m_queue; |
251 // Note: Holding a WebThread raw pointer is not generally safe, but we can | 249 // Note: Holding a WebThread raw pointer is not generally safe, but we can |
252 // do that in this case because: | 250 // do that in this case because: |
253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | 251 // 1. Destructing a ReaderImpl when the bound thread ends is a user's |
254 // responsibility. | 252 // responsibility. |
255 // 2. |m_readerThread| will never be used after the associated reader is | 253 // 2. |m_readerThread| will never be used after the associated reader is |
256 // detached. | 254 // detached. |
257 WebThread* m_readerThread; | 255 WebThread* m_readerThread; |
258 WebDataConsumerHandle::Client* m_client; | 256 WebDataConsumerHandle::Client* m_client; |
259 size_t m_offset; | 257 size_t m_offset; |
260 bool m_isTwoPhaseReadInProgress; | 258 bool m_isTwoPhaseReadInProgress; |
(...skipping 21 matching lines...) Expand all Loading... |
282 } | 280 } |
283 | 281 |
284 Result beginRead(const void** buffer, Flags, size_t* available) override | 282 Result beginRead(const void** buffer, Flags, size_t* available) override |
285 { | 283 { |
286 MutexLocker locker(context()->mutex()); | 284 MutexLocker locker(context()->mutex()); |
287 *available = 0; | 285 *available = 0; |
288 *buffer = nullptr; | 286 *buffer = nullptr; |
289 if (context()->isEmpty()) | 287 if (context()->isEmpty()) |
290 return context()->getResult(); | 288 return context()->getResult(); |
291 | 289 |
292 const std::unique_ptr<Vector<char>>& chunk = context()->top(); | 290 const OwnPtr<Vector<char>>& chunk = context()->top(); |
293 *available = chunk->size() - context()->offset(); | 291 *available = chunk->size() - context()->offset(); |
294 *buffer = chunk->data() + context()->offset(); | 292 *buffer = chunk->data() + context()->offset(); |
295 return WebDataConsumerHandle::Ok; | 293 return WebDataConsumerHandle::Ok; |
296 } | 294 } |
297 | 295 |
298 Result endRead(size_t readSize) override | 296 Result endRead(size_t readSize) override |
299 { | 297 { |
300 MutexLocker locker(context()->mutex()); | 298 MutexLocker locker(context()->mutex()); |
301 if (context()->isEmpty()) | 299 if (context()->isEmpty()) |
302 return WebDataConsumerHandle::UnexpectedError; | 300 return WebDataConsumerHandle::UnexpectedError; |
303 context()->consume(readSize); | 301 context()->consume(readSize); |
304 return WebDataConsumerHandle::Ok; | 302 return WebDataConsumerHandle::Ok; |
305 } | 303 } |
306 | 304 |
307 private: | 305 private: |
308 DestinationContext* context() { return m_contextProxy->context(); } | 306 DestinationContext* context() { return m_contextProxy->context(); } |
309 | 307 |
310 RefPtr<DestinationContext::Proxy> m_contextProxy; | 308 RefPtr<DestinationContext::Proxy> m_contextProxy; |
311 }; | 309 }; |
312 | 310 |
313 class DestinationHandle final : public WebDataConsumerHandle { | 311 class DestinationHandle final : public WebDataConsumerHandle { |
314 public: | 312 public: |
315 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC
ontext::Proxy> contextProxy) | 313 static PassOwnPtr<WebDataConsumerHandle> create(PassRefPtr<DestinationContex
t::Proxy> contextProxy) |
316 { | 314 { |
317 return wrapUnique(new DestinationHandle(contextProxy)); | 315 return adoptPtr(new DestinationHandle(contextProxy)); |
318 } | 316 } |
319 | 317 |
320 private: | 318 private: |
321 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } | 319 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } |
322 DestinationReader* obtainReaderInternal(Client* client) { return new Destina
tionReader(m_contextProxy, client); } | 320 DestinationReader* obtainReaderInternal(Client* client) { return new Destina
tionReader(m_contextProxy, client); } |
323 const char* debugName() const override { return "DestinationHandle"; } | 321 const char* debugName() const override { return "DestinationHandle"; } |
324 | 322 |
325 RefPtr<DestinationContext::Proxy> m_contextProxy; | 323 RefPtr<DestinationContext::Proxy> m_contextProxy; |
326 }; | 324 }; |
327 | 325 |
328 // Bound to the created thread. | 326 // Bound to the created thread. |
329 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { | 327 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { |
330 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); | 328 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); |
331 public: | 329 public: |
332 SourceContext( | 330 SourceContext( |
333 PassRefPtr<TeeRootObject> root, | 331 PassRefPtr<TeeRootObject> root, |
334 std::unique_ptr<WebDataConsumerHandle> src, | 332 PassOwnPtr<WebDataConsumerHandle> src, |
335 PassRefPtr<DestinationContext> dest1, | 333 PassRefPtr<DestinationContext> dest1, |
336 PassRefPtr<DestinationContext> dest2, | 334 PassRefPtr<DestinationContext> dest2, |
337 ExecutionContext* executionContext) | 335 ExecutionContext* executionContext) |
338 : ActiveDOMObject(executionContext) | 336 : ActiveDOMObject(executionContext) |
339 , m_root(root) | 337 , m_root(root) |
340 , m_reader(src->obtainReader(this)) | 338 , m_reader(src->obtainReader(this)) |
341 , m_dest1(dest1) | 339 , m_dest1(dest1) |
342 , m_dest2(dest2) | 340 , m_dest2(dest2) |
343 { | 341 { |
344 suspendIfNeeded(); | 342 suspendIfNeeded(); |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
389 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError); | 387 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError); |
390 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError); | 388 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError); |
391 m_root->stop(); | 389 m_root->stop(); |
392 m_root = nullptr; | 390 m_root = nullptr; |
393 m_reader = nullptr; | 391 m_reader = nullptr; |
394 m_dest1 = nullptr; | 392 m_dest1 = nullptr; |
395 m_dest2 = nullptr; | 393 m_dest2 = nullptr; |
396 } | 394 } |
397 | 395 |
398 RefPtr<TeeRootObject> m_root; | 396 RefPtr<TeeRootObject> m_root; |
399 std::unique_ptr<WebDataConsumerHandle::Reader> m_reader; | 397 OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
400 RefPtr<DestinationContext> m_dest1; | 398 RefPtr<DestinationContext> m_dest1; |
401 RefPtr<DestinationContext> m_dest2; | 399 RefPtr<DestinationContext> m_dest2; |
402 }; | 400 }; |
403 | 401 |
404 } // namespace | 402 } // namespace |
405 | 403 |
406 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<WebDataConsumerHandle> src, std::unique_ptr<WebDataConsumerHandle>* dest1, std:
:unique_ptr<WebDataConsumerHandle>* dest2) | 404 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD
ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons
umerHandle>* dest2) |
407 { | 405 { |
408 RefPtr<TeeRootObject> root = TeeRootObject::create(); | 406 RefPtr<TeeRootObject> root = TeeRootObject::create(); |
409 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root); | 407 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root); |
410 RefPtr<DestinationContext> context1 = DestinationContext::create(); | 408 RefPtr<DestinationContext> context1 = DestinationContext::create(); |
411 RefPtr<DestinationContext> context2 = DestinationContext::create(); | 409 RefPtr<DestinationContext> context2 = DestinationContext::create(); |
412 | 410 |
413 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); | 411 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); |
414 | 412 |
415 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); | 413 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); |
416 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); | 414 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); |
417 } | 415 } |
418 | 416 |
419 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1,
std::unique_ptr<FetchDataConsumerHandle>* dest2) | 417 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<Fetc
hDataConsumerHandle> src, OwnPtr<FetchDataConsumerHandle>* dest1, OwnPtr<FetchDa
taConsumerHandle>* dest2) |
420 { | 418 { |
421 RefPtr<BlobDataHandle> blobDataHandle = src->obtainReader(nullptr)->drainAsB
lobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize); | 419 RefPtr<BlobDataHandle> blobDataHandle = src->obtainReader(nullptr)->drainAsB
lobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize); |
422 if (blobDataHandle) { | 420 if (blobDataHandle) { |
423 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 421 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
424 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 422 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
425 return; | 423 return; |
426 } | 424 } |
427 | 425 |
428 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; | 426 OwnPtr<WebDataConsumerHandle> webDest1, webDest2; |
429 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); | 427 DataConsumerTee::create(executionContext, static_cast<PassOwnPtr<WebDataCons
umerHandle>>(std::move(src)), &webDest1, &webDest2); |
430 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); | 428 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); |
431 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); | 429 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); |
432 return; | 430 return; |
433 } | 431 } |
434 | 432 |
435 } // namespace blink | 433 } // namespace blink |
OLD | NEW |