| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "modules/fetch/DataConsumerTee.h" | 5 #include "modules/fetch/DataConsumerTee.h" |
| 6 | 6 |
| 7 #include "core/dom/ActiveDOMObject.h" | 7 #include "core/dom/ActiveDOMObject.h" |
| 8 #include "core/dom/ExecutionContext.h" | 8 #include "core/dom/ExecutionContext.h" |
| 9 #include "core/dom/TaskRunnerHelper.h" |
| 9 #include "modules/fetch/DataConsumerHandleUtil.h" | 10 #include "modules/fetch/DataConsumerHandleUtil.h" |
| 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" | 11 #include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| 11 #include "platform/CrossThreadFunctional.h" | 12 #include "platform/CrossThreadFunctional.h" |
| 12 #include "platform/heap/Handle.h" | 13 #include "platform/heap/Handle.h" |
| 13 #include "public/platform/Platform.h" | 14 #include "public/platform/Platform.h" |
| 14 #include "public/platform/WebTaskRunner.h" | 15 #include "public/platform/WebTaskRunner.h" |
| 15 #include "public/platform/WebThread.h" | 16 #include "public/platform/WebThread.h" |
| 16 #include "public/platform/WebTraceLocation.h" | 17 #include "public/platform/WebTraceLocation.h" |
| 17 #include "wtf/Deque.h" | 18 #include "wtf/Deque.h" |
| 18 #include "wtf/Functional.h" | 19 #include "wtf/Functional.h" |
| (...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 176 } | 177 } |
| 177 | 178 |
| 178 void notify() | 179 void notify() |
| 179 { | 180 { |
| 180 { | 181 { |
| 181 MutexLocker locker(m_mutex); | 182 MutexLocker locker(m_mutex); |
| 182 if (!m_client) { | 183 if (!m_client) { |
| 183 // No client is registered. | 184 // No client is registered. |
| 184 return; | 185 return; |
| 185 } | 186 } |
| 186 DCHECK(m_readerThread); | 187 DCHECK(m_readerTaskRunner); |
| 187 if (!m_readerThread->isCurrentThread()) { | 188 if (!m_readerTaskRunner->runsTasksOnCurrentThread()) { |
| 188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr
ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this))); | 189 m_readerTaskRunner->postTask(BLINK_FROM_HERE, crossThreadBind(&D
estinationContext::notify, wrapPassRefPtr(this))); |
| 189 return; | 190 return; |
| 190 } | 191 } |
| 191 } | 192 } |
| 192 // The reading thread is the current thread. | 193 // The reading thread is the current thread. |
| 193 if (m_client) | 194 if (m_client) |
| 194 m_client->didGetReadable(); | 195 m_client->didGetReadable(); |
| 195 } | 196 } |
| 196 | 197 |
| 197 Mutex& mutex() { return m_mutex; } | 198 Mutex& mutex() { return m_mutex; } |
| 198 | 199 |
| 199 // The following functions don't use lock. They should be protected by the | 200 // The following functions don't use lock. They should be protected by the |
| 200 // caller. | 201 // caller. |
| 201 void attachReader(WebDataConsumerHandle::Client* client) | 202 void attachReader(WebDataConsumerHandle::Client* client, std::unique_ptr<Web
TaskRunner> readerTaskRunner) |
| 202 { | 203 { |
| 203 DCHECK(!m_readerThread); | 204 DCHECK(!m_readerTaskRunner); |
| 204 DCHECK(!m_client); | 205 DCHECK(!m_client); |
| 205 m_readerThread = Platform::current()->currentThread(); | 206 m_readerTaskRunner = std::move(readerTaskRunner); |
| 206 m_client = client; | 207 m_client = client; |
| 207 } | 208 } |
| 208 void detachReader() | 209 void detachReader() |
| 209 { | 210 { |
| 210 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | 211 DCHECK(m_readerTaskRunner && m_readerTaskRunner->runsTasksOnCurrentThrea
d()); |
| 211 m_readerThread = nullptr; | 212 m_readerTaskRunner = nullptr; |
| 212 m_client = nullptr; | 213 m_client = nullptr; |
| 213 } | 214 } |
| 214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } | 215 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } |
| 215 bool isEmpty() const { return m_queue.isEmpty(); } | 216 bool isEmpty() const { return m_queue.isEmpty(); } |
| 216 size_t offset() const { return m_offset; } | 217 size_t offset() const { return m_offset; } |
| 217 void consume(size_t size) | 218 void consume(size_t size) |
| 218 { | 219 { |
| 219 const auto& top = m_queue.first(); | 220 const auto& top = m_queue.first(); |
| 220 DCHECK(m_offset <= m_offset + size); | 221 DCHECK(m_offset <= m_offset + size); |
| 221 DCHECK(m_offset + size <= top->size()); | 222 DCHECK(m_offset + size <= top->size()); |
| 222 if (top->size() <= m_offset + size) { | 223 if (top->size() <= m_offset + size) { |
| 223 m_offset = 0; | 224 m_offset = 0; |
| 224 m_queue.removeFirst(); | 225 m_queue.removeFirst(); |
| 225 } else { | 226 } else { |
| 226 m_offset += size; | 227 m_offset += size; |
| 227 } | 228 } |
| 228 } | 229 } |
| 229 Result getResult() { return m_result; } | 230 Result getResult() { return m_result; } |
| 230 | 231 |
| 231 private: | 232 private: |
| 232 DestinationContext() | 233 DestinationContext() |
| 233 : m_result(WebDataConsumerHandle::ShouldWait) | 234 : m_result(WebDataConsumerHandle::ShouldWait) |
| 234 , m_readerThread(nullptr) | 235 , m_readerTaskRunner(nullptr) |
| 235 , m_client(nullptr) | 236 , m_client(nullptr) |
| 236 , m_offset(0) | 237 , m_offset(0) |
| 237 , m_isTwoPhaseReadInProgress(false) | 238 , m_isTwoPhaseReadInProgress(false) |
| 238 { | 239 { |
| 239 } | 240 } |
| 240 | 241 |
| 241 void detach() | 242 void detach() |
| 242 { | 243 { |
| 243 MutexLocker locker(m_mutex); | 244 MutexLocker locker(m_mutex); |
| 244 DCHECK(!m_client); | 245 DCHECK(!m_client); |
| 245 DCHECK(!m_readerThread); | 246 DCHECK(!m_readerTaskRunner); |
| 246 m_queue.clear(); | 247 m_queue.clear(); |
| 247 } | 248 } |
| 248 | 249 |
| 249 Result m_result; | 250 Result m_result; |
| 250 Deque<std::unique_ptr<Vector<char>>> m_queue; | 251 Deque<std::unique_ptr<Vector<char>>> m_queue; |
| 251 // Note: Holding a WebThread raw pointer is not generally safe, but we can | 252 std::unique_ptr<WebTaskRunner> m_readerTaskRunner; |
| 252 // do that in this case because: | |
| 253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | |
| 254 // responsibility. | |
| 255 // 2. |m_readerThread| will never be used after the associated reader is | |
| 256 // detached. | |
| 257 WebThread* m_readerThread; | |
| 258 WebDataConsumerHandle::Client* m_client; | 253 WebDataConsumerHandle::Client* m_client; |
| 259 size_t m_offset; | 254 size_t m_offset; |
| 260 bool m_isTwoPhaseReadInProgress; | 255 bool m_isTwoPhaseReadInProgress; |
| 261 Mutex m_mutex; | 256 Mutex m_mutex; |
| 262 }; | 257 }; |
| 263 | 258 |
| 264 class DestinationReader final : public WebDataConsumerHandle::Reader { | 259 class DestinationReader final : public WebDataConsumerHandle::Reader { |
| 265 public: | 260 public: |
| 266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat
aConsumerHandle::Client* client) | 261 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat
aConsumerHandle::Client* client, std::unique_ptr<WebTaskRunner> readerTaskRunner
) |
| 267 : m_contextProxy(contextProxy) | 262 : m_contextProxy(contextProxy) |
| 268 { | 263 { |
| 269 MutexLocker locker(context()->mutex()); | 264 MutexLocker locker(context()->mutex()); |
| 270 context()->attachReader(client); | 265 context()->attachReader(client, readerTaskRunner->clone()); |
| 271 if (client) { | 266 if (client) { |
| 272 // We need to use crossThreadBind here to retain the context. Note | 267 readerTaskRunner->postTask(BLINK_FROM_HERE, bind(&DestinationContext
::notify, wrapPassRefPtr(context()))); |
| 273 // |context()| return value is of type DestinationContext*, not | |
| 274 // PassRefPtr<DestinationContext>. | |
| 275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B
LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont
ext()))); | |
| 276 } | 268 } |
| 277 } | 269 } |
| 278 ~DestinationReader() override | 270 ~DestinationReader() override |
| 279 { | 271 { |
| 280 MutexLocker locker(context()->mutex()); | 272 MutexLocker locker(context()->mutex()); |
| 281 context()->detachReader(); | 273 context()->detachReader(); |
| 282 } | 274 } |
| 283 | 275 |
| 284 Result beginRead(const void** buffer, Flags, size_t* available) override | 276 Result beginRead(const void** buffer, Flags, size_t* available) override |
| 285 { | 277 { |
| (...skipping 24 matching lines...) Expand all Loading... |
| 310 RefPtr<DestinationContext::Proxy> m_contextProxy; | 302 RefPtr<DestinationContext::Proxy> m_contextProxy; |
| 311 }; | 303 }; |
| 312 | 304 |
| 313 class DestinationHandle final : public WebDataConsumerHandle { | 305 class DestinationHandle final : public WebDataConsumerHandle { |
| 314 public: | 306 public: |
| 315 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC
ontext::Proxy> contextProxy) | 307 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC
ontext::Proxy> contextProxy) |
| 316 { | 308 { |
| 317 return wrapUnique(new DestinationHandle(contextProxy)); | 309 return wrapUnique(new DestinationHandle(contextProxy)); |
| 318 } | 310 } |
| 319 | 311 |
| 320 std::unique_ptr<Reader> obtainReader(Client* client) | 312 std::unique_ptr<Reader> obtainReader(Client* client, std::unique_ptr<WebTask
Runner> readerTaskRunner) |
| 321 { | 313 { |
| 322 return wrapUnique(new DestinationReader(m_contextProxy, client)); | 314 return wrapUnique(new DestinationReader(m_contextProxy, client, std::mov
e(readerTaskRunner))); |
| 323 } | 315 } |
| 324 | 316 |
| 325 private: | 317 private: |
| 326 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } | 318 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co
ntextProxy(contextProxy) { } |
| 327 const char* debugName() const override { return "DestinationHandle"; } | 319 const char* debugName() const override { return "DestinationHandle"; } |
| 328 | 320 |
| 329 RefPtr<DestinationContext::Proxy> m_contextProxy; | 321 RefPtr<DestinationContext::Proxy> m_contextProxy; |
| 330 }; | 322 }; |
| 331 | 323 |
| 332 // Bound to the created thread. | 324 // Bound to the created thread. |
| 333 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { | 325 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub
lic ActiveDOMObject, public WebDataConsumerHandle::Client { |
| 334 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); | 326 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); |
| 335 public: | 327 public: |
| 336 SourceContext( | 328 SourceContext( |
| 337 PassRefPtr<TeeRootObject> root, | 329 PassRefPtr<TeeRootObject> root, |
| 338 std::unique_ptr<WebDataConsumerHandle> src, | 330 std::unique_ptr<WebDataConsumerHandle> src, |
| 339 PassRefPtr<DestinationContext> dest1, | 331 PassRefPtr<DestinationContext> dest1, |
| 340 PassRefPtr<DestinationContext> dest2, | 332 PassRefPtr<DestinationContext> dest2, |
| 341 ExecutionContext* executionContext) | 333 ExecutionContext* executionContext) |
| 342 : ActiveDOMObject(executionContext) | 334 : ActiveDOMObject(executionContext) |
| 343 , m_root(root) | 335 , m_root(root) |
| 344 , m_reader(src->obtainReader(this)) | 336 , m_reader(src->obtainReader(this, TaskRunnerHelper::getUnthrottledTaskR
unner(executionContext)->clone())) |
| 345 , m_dest1(dest1) | 337 , m_dest1(dest1) |
| 346 , m_dest2(dest2) | 338 , m_dest2(dest2) |
| 347 { | 339 { |
| 348 suspendIfNeeded(); | 340 suspendIfNeeded(); |
| 349 } | 341 } |
| 350 ~SourceContext() override | 342 ~SourceContext() override |
| 351 { | 343 { |
| 352 stopInternal(); | 344 stopInternal(); |
| 353 } | 345 } |
| 354 | 346 |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 415 RefPtr<DestinationContext> context2 = DestinationContext::create(); | 407 RefPtr<DestinationContext> context2 = DestinationContext::create(); |
| 416 | 408 |
| 417 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); | 409 root->initialize(new SourceContext(root, std::move(src), context1, context2,
executionContext)); |
| 418 | 410 |
| 419 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); | 411 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context
1, tracker)); |
| 420 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); | 412 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context
2, tracker)); |
| 421 } | 413 } |
| 422 | 414 |
| 423 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1,
std::unique_ptr<FetchDataConsumerHandle>* dest2) | 415 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr
<FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1,
std::unique_ptr<FetchDataConsumerHandle>* dest2) |
| 424 { | 416 { |
| 425 RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr)-
>drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize
); | 417 std::unique_ptr<WebTaskRunner> readerTaskRunner = TaskRunnerHelper::getUnthr
ottledTaskRunner(executionContext)->clone(); |
| 418 RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr,
std::move(readerTaskRunner))->drainAsBlobDataHandle(FetchDataConsumerHandle::Rea
der::AllowBlobWithInvalidSize); |
| 426 if (blobDataHandle) { | 419 if (blobDataHandle) { |
| 427 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 420 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
| 428 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); | 421 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH
andle); |
| 429 return; | 422 return; |
| 430 } | 423 } |
| 431 | 424 |
| 432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; | 425 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; |
| 433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); | 426 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); |
| 434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); | 427 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); |
| 435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); | 428 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); |
| 436 return; | 429 return; |
| 437 } | 430 } |
| 438 | 431 |
| 439 } // namespace blink | 432 } // namespace blink |
| OLD | NEW |