| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "modules/fetch/DataConsumerTee.h" | 5 #include "modules/fetch/DataConsumerTee.h" |
| 6 | 6 |
| 7 #include "core/dom/ActiveDOMObject.h" | 7 #include "core/dom/ActiveDOMObject.h" |
| 8 #include "core/dom/ExecutionContext.h" | 8 #include "core/dom/ExecutionContext.h" |
| 9 #include "modules/fetch/DataConsumerHandleUtil.h" | 9 #include "modules/fetch/DataConsumerHandleUtil.h" |
| 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" | 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| 11 #include "platform/ThreadSafeFunctional.h" | 11 #include "platform/ThreadSafeFunctional.h" |
| 12 #include "platform/heap/Handle.h" | 12 #include "platform/heap/Handle.h" |
| 13 #include "public/platform/Platform.h" | 13 #include "public/platform/Platform.h" |
| 14 #include "public/platform/WebTaskRunner.h" | 14 #include "public/platform/WebTaskRunner.h" |
| 15 #include "public/platform/WebThread.h" | 15 #include "public/platform/WebThread.h" |
| 16 #include "public/platform/WebTraceLocation.h" | 16 #include "public/platform/WebTraceLocation.h" |
| 17 #include "wtf/Deque.h" | 17 #include "wtf/Deque.h" |
| 18 #include "wtf/Functional.h" | 18 #include "wtf/Functional.h" |
| 19 #include "wtf/PtrUtil.h" | |
| 20 #include "wtf/ThreadSafeRefCounted.h" | 19 #include "wtf/ThreadSafeRefCounted.h" |
| 21 #include "wtf/ThreadingPrimitives.h" | 20 #include "wtf/ThreadingPrimitives.h" |
| 22 #include "wtf/Vector.h" | 21 #include "wtf/Vector.h" |
| 23 #include <memory> | |
| 24 | 22 |
| 25 namespace blink { | 23 namespace blink { |
| 26 | 24 |
| 27 using Result = WebDataConsumerHandle::Result; | 25 using Result = WebDataConsumerHandle::Result; |
| 28 using Flags = WebDataConsumerHandle::Flags; | 26 using Flags = WebDataConsumerHandle::Flags; |
| 29 | 27 |
| 30 namespace { | 28 namespace { |
| 31 | 29 |
| 32 // This file contains the "tee" implementation. There are several classes and | 30 // This file contains the "tee" implementation. There are several classes and |
| 33 // their relationship is complicated, so let me describe here. | 31 // their relationship is complicated, so let me describe here. |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 }; | 141 }; |
| 144 | 142 |
| 145 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina
tionContext()); } | 143 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina
tionContext()); } |
| 146 | 144 |
| 147 void enqueue(const char* buffer, size_t size) | 145 void enqueue(const char* buffer, size_t size) |
| 148 { | 146 { |
| 149 bool needsNotification = false; | 147 bool needsNotification = false; |
| 150 { | 148 { |
| 151 MutexLocker locker(m_mutex); | 149 MutexLocker locker(m_mutex); |
| 152 needsNotification = m_queue.isEmpty(); | 150 needsNotification = m_queue.isEmpty(); |
| 153 std::unique_ptr<Vector<char>> data = wrapUnique(new Vector<char>); | 151 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>); |
| 154 data->append(buffer, size); | 152 data->append(buffer, size); |
| 155 m_queue.append(std::move(data)); | 153 m_queue.append(std::move(data)); |
| 156 } | 154 } |
| 157 if (needsNotification) | 155 if (needsNotification) |
| 158 notify(); | 156 notify(); |
| 159 } | 157 } |
| 160 | 158 |
| 161 void setResult(Result r) | 159 void setResult(Result r) |
| 162 { | 160 { |
| 163 ASSERT(r != WebDataConsumerHandle::Ok); | 161 ASSERT(r != WebDataConsumerHandle::Ok); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 204 ASSERT(!m_client); | 202 ASSERT(!m_client); |
| 205 m_readerThread = Platform::current()->currentThread(); | 203 m_readerThread = Platform::current()->currentThread(); |
| 206 m_client = client; | 204 m_client = client; |
| 207 } | 205 } |
| 208 void detachReader() | 206 void detachReader() |
| 209 { | 207 { |
| 210 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); | 208 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| 211 m_readerThread = nullptr; | 209 m_readerThread = nullptr; |
| 212 m_client = nullptr; | 210 m_client = nullptr; |
| 213 } | 211 } |
| 214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } | 212 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); } |
| 215 bool isEmpty() const { return m_queue.isEmpty(); } | 213 bool isEmpty() const { return m_queue.isEmpty(); } |
| 216 size_t offset() const { return m_offset; } | 214 size_t offset() const { return m_offset; } |
| 217 void consume(size_t size) | 215 void consume(size_t size) |
| 218 { | 216 { |
| 219 const auto& top = m_queue.first(); | 217 const auto& top = m_queue.first(); |
| 220 ASSERT(m_offset <= m_offset + size); | 218 ASSERT(m_offset <= m_offset + size); |
| 221 ASSERT(m_offset + size <= top->size()); | 219 ASSERT(m_offset + size <= top->size()); |
| 222 if (top->size() <= m_offset + size) { | 220 if (top->size() <= m_offset + size) { |
| 223 m_offset = 0; | 221 m_offset = 0; |
| 224 m_queue.removeFirst(); | 222 m_queue.removeFirst(); |
| (...skipping 15 matching lines...) Expand all Loading... |
| 240 | 238 |
| 241 void detach() | 239 void detach() |
| 242 { | 240 { |
| 243 MutexLocker locker(m_mutex); | 241 MutexLocker locker(m_mutex); |
| 244 ASSERT(!m_client); | 242 ASSERT(!m_client); |
| 245 ASSERT(!m_readerThread); | 243 ASSERT(!m_readerThread); |
| 246 m_queue.clear(); | 244 m_queue.clear(); |
| 247 } | 245 } |
| 248 | 246 |
| 249 Result m_result; | 247 Result m_result; |
| 250 Deque<std::unique_ptr<Vector<char>>> m_queue; | 248 Deque<OwnPtr<Vector<char>>> m_queue; |
| 251 // Note: Holding a WebThread raw pointer is not generally safe, but we can | 249 // Note: Holding a WebThread raw pointer is not generally safe, but we can |
| 252 // do that in this case because: | 250 // do that in this case because: |
| 253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | 251 // 1. Destructing a ReaderImpl when the bound thread ends is a user's |
| 254 // responsibility. | 252 // responsibility. |
| 255 // 2. |m_readerThread| will never be used after the associated reader is | 253 // 2. |m_readerThread| will never be used after the associated reader is |
| 256 // detached. | 254 // detached. |
| 257 WebThread* m_readerThread; | 255 WebThread* m_readerThread; |
| 258 WebDataConsumerHandle::Client* m_client; | 256 WebDataConsumerHandle::Client* m_client; |
| 259 size_t m_offset; | 257 size_t m_offset; |
| 260 bool m_isTwoPhaseReadInProgress; | 258 bool m_isTwoPhaseReadInProgress; |
| (...skipping 21 matching lines...) Expand all Loading... |
| 282 } | 280 } |
| 283 | 281 |
| 284 Result beginRead(const void** buffer, Flags, size_t* available) override | 282 Result beginRead(const void** buffer, Flags, size_t* available) override |
| 285 { | 283 { |
| 286 MutexLocker locker(context()->mutex()); | 284 MutexLocker locker(context()->mutex()); |
| 287 *available = 0; | 285 *available = 0; |
| 288 *buffer = nullptr; | 286 *buffer = nullptr; |
| 289 if (context()->isEmpty()) | 287 if (context()->isEmpty()) |
| 290 return context()->getResult(); | 288 return context()->getResult(); |
| 291 | 289 |
| 292 const std::unique_ptr<Vector<char>>& chunk = context()->top(); | 290 const OwnPtr<Vector<char>>& chunk = context()->top(); |
| 293 *available = chunk->size() - context()->offset(); | 291 *available = chunk->size() - context()->offset(); |
| 294 *buffer = chunk->data() + context()->offset(); | 292 *buffer = chunk->data() + context()->offset(); |
| 295 return WebDataConsumerHandle::Ok; | 293 return WebDataConsumerHandle::Ok; |
| 296 } | 294 } |
| 297 | 295 |
| 298 Result endRead(size_t readSize) override | 296 Result endRead(size_t readSize) override |
| 299 { | 297 { |
| 300 MutexLocker locker(context()->mutex()); | 298 MutexLocker locker(context()->mutex()); |
| 301 if (context()->isEmpty()) | 299 if (context()->isEmpty()) |
| 302 return WebDataConsumerHandle::UnexpectedError; | 300 return WebDataConsumerHandle::UnexpectedError; |
| 303 context()->consume(readSize); | 301 context()->consume(readSize); |
| 304 return WebDataConsumerHandle::Ok; | 302 return WebDataConsumerHandle::Ok; |
| 305 } | 303 } |
| 306 | 304 |
| 307 private: | 305 private: |
| 308 DestinationContext* context() { return m_contextProxy->context(); } | 306 DestinationContext* context() { return m_contextProxy->context(); } |
| 309 | 307 |
| 310 RefPtr<DestinationContext::Proxy> m_contextProxy; | 308 RefPtr<DestinationContext::Proxy> m_contextProxy; |
| 311 }; | 309 }; |
| 312 | 310 |
| 313 class DestinationHandle final : public WebDataConsumerHandle { | 311 class DestinationHandle final : public WebDataConsumerHandle { |
| 314 public: | 312 public: |
| 315 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC
ontext::Proxy> contextProxy) | 313 static PassOwnPtr<WebDataConsumerHandle> create(PassRefPtr<DestinationContex
t::Proxy> contextProxy) |
| 316 { | 314 { |
| 317 return wrapUnique(new DestinationHandle(contextProxy)); | 315 return adoptPtr(new DestinationHandle(contextProxy)); |
| 318 } | 316 } |
| 319 | 317 |
| 320 private: | 318 private: |
| 321 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } | 319 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } |
| 322 DestinationReader* obtainReaderInternal(Client* client) { return new Destina
tionReader(m_contextProxy, client); } | 320 DestinationReader* obtainReaderInternal(Client* client) { return new Destina
tionReader(m_contextProxy, client); } |
| 323 const char* debugName() const override { return "DestinationHandle"; } | 321 const char* debugName() const override { return "DestinationHandle"; } |
| 324 | 322 |
| 325 RefPtr<DestinationContext::Proxy> m_contextProxy; | 323 RefPtr<DestinationContext::Proxy> m_contextProxy; |
| 326 }; | 324 }; |
| 327 | 325 |
| 328 // Bound to the created thread. | 326 // Bound to the created thread. |
| 329 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { | 327 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { |
| 330 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); | 328 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); |
| 331 public: | 329 public: |
| 332 SourceContext( | 330 SourceContext( |
| 333 PassRefPtr<TeeRootObject> root, | 331 PassRefPtr<TeeRootObject> root, |
| 334 std::unique_ptr<WebDataConsumerHandle> src, | 332 PassOwnPtr<WebDataConsumerHandle> src, |
| 335 PassRefPtr<DestinationContext> dest1, | 333 PassRefPtr<DestinationContext> dest1, |
| 336 PassRefPtr<DestinationContext> dest2, | 334 PassRefPtr<DestinationContext> dest2, |
| 337 ExecutionContext* executionContext) | 335 ExecutionContext* executionContext) |
| 338 : ActiveDOMObject(executionContext) | 336 : ActiveDOMObject(executionContext) |
| 339 , m_root(root) | 337 , m_root(root) |
| 340 , m_reader(src->obtainReader(this)) | 338 , m_reader(src->obtainReader(this)) |
| 341 , m_dest1(dest1) | 339 , m_dest1(dest1) |
| 342 , m_dest2(dest2) | 340 , m_dest2(dest2) |
| 343 { | 341 { |
| 344 suspendIfNeeded(); | 342 suspendIfNeeded(); |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 389 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError); | 387 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError); |
| 390 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError); | 388 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError); |
| 391 m_root->stop(); | 389 m_root->stop(); |
| 392 m_root = nullptr; | 390 m_root = nullptr; |
| 393 m_reader = nullptr; | 391 m_reader = nullptr; |
| 394 m_dest1 = nullptr; | 392 m_dest1 = nullptr; |
| 395 m_dest2 = nullptr; | 393 m_dest2 = nullptr; |
| 396 } | 394 } |
| 397 | 395 |
| 398 RefPtr<TeeRootObject> m_root; | 396 RefPtr<TeeRootObject> m_root; |
| 399 std::unique_ptr<WebDataConsumerHandle::Reader> m_reader; | 397 OwnPtr<WebDataConsumerHandle::Reader> m_reader; |
| 400 RefPtr<DestinationContext> m_dest1; | 398 RefPtr<DestinationContext> m_dest1; |
| 401 RefPtr<DestinationContext> m_dest2; | 399 RefPtr<DestinationContext> m_dest2; |
| 402 }; | 400 }; |
| 403 | 401 |
| 404 } // namespace | 402 } // namespace |
| 405 | 403 |
| 406 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<WebDataConsumerHandle> src, std::unique_ptr<WebDataConsumerHandle>* dest1, std:
:unique_ptr<WebDataConsumerHandle>* dest2) | 404 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD
ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons
umerHandle>* dest2) |
| 407 { | 405 { |
| 408 RefPtr<TeeRootObject> root = TeeRootObject::create(); | 406 RefPtr<TeeRootObject> root = TeeRootObject::create(); |
| 409 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root); | 407 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root); |
| 410 RefPtr<DestinationContext> context1 = DestinationContext::create(); | 408 RefPtr<DestinationContext> context1 = DestinationContext::create(); |
| 411 RefPtr<DestinationContext> context2 = DestinationContext::create(); | 409 RefPtr<DestinationContext> context2 = DestinationContext::create(); |
| 412 | 410 |
| 413 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); | 411 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); |
| 414 | 412 |
| 415 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); | 413 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); |
| 416 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); | 414 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); |
| 417 } | 415 } |
| 418 | 416 |
| 419 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1,
std::unique_ptr<FetchDataConsumerHandle>* dest2) | 417 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<Fetc
hDataConsumerHandle> src, OwnPtr<FetchDataConsumerHandle>* dest1, OwnPtr<FetchDa
taConsumerHandle>* dest2) |
| 420 { | 418 { |
| 421 RefPtr<BlobDataHandle> blobDataHandle = src->obtainReader(nullptr)->drainAsB
lobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize); | 419 RefPtr<BlobDataHandle> blobDataHandle = src->obtainReader(nullptr)->drainAsB
lobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize); |
| 422 if (blobDataHandle) { | 420 if (blobDataHandle) { |
| 423 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 421 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
| 424 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 422 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
| 425 return; | 423 return; |
| 426 } | 424 } |
| 427 | 425 |
| 428 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; | 426 OwnPtr<WebDataConsumerHandle> webDest1, webDest2; |
| 429 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); | 427 DataConsumerTee::create(executionContext, static_cast<PassOwnPtr<WebDataCons
umerHandle>>(std::move(src)), &webDest1, &webDest2); |
| 430 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); | 428 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); |
| 431 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); | 429 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); |
| 432 return; | 430 return; |
| 433 } | 431 } |
| 434 | 432 |
| 435 } // namespace blink | 433 } // namespace blink |
| OLD | NEW |