| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "modules/fetch/DataConsumerTee.h" | 5 #include "modules/fetch/DataConsumerTee.h" |
| 6 | 6 |
| 7 #include "core/dom/ActiveDOMObject.h" | 7 #include "core/dom/ActiveDOMObject.h" |
| 8 #include "core/dom/ExecutionContext.h" | 8 #include "core/dom/ExecutionContext.h" |
| 9 #include "core/dom/TaskRunnerHelper.h" |
| 9 #include "modules/fetch/DataConsumerHandleUtil.h" | 10 #include "modules/fetch/DataConsumerHandleUtil.h" |
| 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" | 11 #include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| 11 #include "platform/CrossThreadFunctional.h" | 12 #include "platform/CrossThreadFunctional.h" |
| 12 #include "platform/heap/Handle.h" | 13 #include "platform/heap/Handle.h" |
| 13 #include "public/platform/Platform.h" | 14 #include "public/platform/Platform.h" |
| 14 #include "public/platform/WebTaskRunner.h" | 15 #include "public/platform/WebTaskRunner.h" |
| 15 #include "public/platform/WebThread.h" | 16 #include "public/platform/WebThread.h" |
| 16 #include "public/platform/WebTraceLocation.h" | 17 #include "public/platform/WebTraceLocation.h" |
| 17 #include "wtf/Deque.h" | 18 #include "wtf/Deque.h" |
| 18 #include "wtf/Functional.h" | 19 #include "wtf/Functional.h" |
| (...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 176 } | 177 } |
| 177 | 178 |
| 178 void notify() | 179 void notify() |
| 179 { | 180 { |
| 180 { | 181 { |
| 181 MutexLocker locker(m_mutex); | 182 MutexLocker locker(m_mutex); |
| 182 if (!m_client) { | 183 if (!m_client) { |
| 183 // No client is registered. | 184 // No client is registered. |
| 184 return; | 185 return; |
| 185 } | 186 } |
| 186 DCHECK(m_readerThread); | 187 if (!m_client->getTaskRunner()->runsTasksOnCurrentThread()) { |
| 187 if (!m_readerThread->isCurrentThread()) { | 188 m_client->getTaskRunner()->postTask(BLINK_FROM_HERE, crossThread
Bind(&DestinationContext::notify, wrapPassRefPtr(this))); |
| 188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr
ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this))); | |
| 189 return; | 189 return; |
| 190 } | 190 } |
| 191 } | 191 } |
| 192 // The reading thread is the current thread. | 192 // The reading thread is the current thread. |
| 193 if (m_client) | 193 if (m_client) |
| 194 m_client->didGetReadable(); | 194 m_client->didGetReadable(); |
| 195 } | 195 } |
| 196 | 196 |
| 197 Mutex& mutex() { return m_mutex; } | 197 Mutex& mutex() { return m_mutex; } |
| 198 | 198 |
| 199 // The following functions don't use lock. They should be protected by the | 199 // The following functions don't use lock. They should be protected by the |
| 200 // caller. | 200 // caller. |
| 201 void attachReader(WebDataConsumerHandle::Client* client) | 201 void attachReader(WebDataConsumerHandle::Client* client) |
| 202 { | 202 { |
| 203 DCHECK(!m_readerThread); | |
| 204 DCHECK(!m_client); | 203 DCHECK(!m_client); |
| 205 m_readerThread = Platform::current()->currentThread(); | |
| 206 m_client = client; | 204 m_client = client; |
| 207 } | 205 } |
| 208 void detachReader() | 206 void detachReader() |
| 209 { | 207 { |
| 210 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | 208 DCHECK(!m_client || m_client->getTaskRunner()->runsTasksOnCurrentThread(
)); |
| 211 m_readerThread = nullptr; | |
| 212 m_client = nullptr; | 209 m_client = nullptr; |
| 213 } | 210 } |
| 214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } | 211 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } |
| 215 bool isEmpty() const { return m_queue.isEmpty(); } | 212 bool isEmpty() const { return m_queue.isEmpty(); } |
| 216 size_t offset() const { return m_offset; } | 213 size_t offset() const { return m_offset; } |
| 217 void consume(size_t size) | 214 void consume(size_t size) |
| 218 { | 215 { |
| 219 const auto& top = m_queue.first(); | 216 const auto& top = m_queue.first(); |
| 220 DCHECK(m_offset <= m_offset + size); | 217 DCHECK(m_offset <= m_offset + size); |
| 221 DCHECK(m_offset + size <= top->size()); | 218 DCHECK(m_offset + size <= top->size()); |
| 222 if (top->size() <= m_offset + size) { | 219 if (top->size() <= m_offset + size) { |
| 223 m_offset = 0; | 220 m_offset = 0; |
| 224 m_queue.removeFirst(); | 221 m_queue.removeFirst(); |
| 225 } else { | 222 } else { |
| 226 m_offset += size; | 223 m_offset += size; |
| 227 } | 224 } |
| 228 } | 225 } |
| 229 Result getResult() { return m_result; } | 226 Result getResult() { return m_result; } |
| 230 | 227 |
| 231 private: | 228 private: |
| 232 DestinationContext() | 229 DestinationContext() |
| 233 : m_result(WebDataConsumerHandle::ShouldWait) | 230 : m_result(WebDataConsumerHandle::ShouldWait) |
| 234 , m_readerThread(nullptr) | |
| 235 , m_client(nullptr) | 231 , m_client(nullptr) |
| 236 , m_offset(0) | 232 , m_offset(0) |
| 237 , m_isTwoPhaseReadInProgress(false) | 233 , m_isTwoPhaseReadInProgress(false) |
| 238 { | 234 { |
| 239 } | 235 } |
| 240 | 236 |
| 241 void detach() | 237 void detach() |
| 242 { | 238 { |
| 243 MutexLocker locker(m_mutex); | 239 MutexLocker locker(m_mutex); |
| 244 DCHECK(!m_client); | 240 DCHECK(!m_client); |
| 245 DCHECK(!m_readerThread); | |
| 246 m_queue.clear(); | 241 m_queue.clear(); |
| 247 } | 242 } |
| 248 | 243 |
| 249 Result m_result; | 244 Result m_result; |
| 250 Deque<std::unique_ptr<Vector<char>>> m_queue; | 245 Deque<std::unique_ptr<Vector<char>>> m_queue; |
| 251 // Note: Holding a WebThread raw pointer is not generally safe, but we can | |
| 252 // do that in this case because: | |
| 253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | |
| 254 // responsibility. | |
| 255 // 2. |m_readerThread| will never be used after the associated reader is | |
| 256 // detached. | |
| 257 WebThread* m_readerThread; | |
| 258 WebDataConsumerHandle::Client* m_client; | 246 WebDataConsumerHandle::Client* m_client; |
| 259 size_t m_offset; | 247 size_t m_offset; |
| 260 bool m_isTwoPhaseReadInProgress; | 248 bool m_isTwoPhaseReadInProgress; |
| 261 Mutex m_mutex; | 249 Mutex m_mutex; |
| 262 }; | 250 }; |
| 263 | 251 |
| 264 class DestinationReader final : public WebDataConsumerHandle::Reader { | 252 class DestinationReader final : public WebDataConsumerHandle::Reader { |
| 265 public: | 253 public: |
| 266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat
aConsumerHandle::Client* client) | 254 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat
aConsumerHandle::Client* client) |
| 267 : m_contextProxy(contextProxy) | 255 : m_contextProxy(contextProxy) |
| 268 { | 256 { |
| 269 MutexLocker locker(context()->mutex()); | 257 MutexLocker locker(context()->mutex()); |
| 270 context()->attachReader(client); | 258 context()->attachReader(client); |
| 271 if (client) { | 259 if (client) { |
| 272 // We need to use crossThreadBind here to retain the context. Note | 260 client->getTaskRunner()->postTask(BLINK_FROM_HERE, bind(&Destination
Context::notify, wrapPassRefPtr(context()))); |
| 273 // |context()| return value is of type DestinationContext*, not | |
| 274 // PassRefPtr<DestinationContext>. | |
| 275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B
LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont
ext()))); | |
| 276 } | 261 } |
| 277 } | 262 } |
| 278 ~DestinationReader() override | 263 ~DestinationReader() override |
| 279 { | 264 { |
| 280 MutexLocker locker(context()->mutex()); | 265 MutexLocker locker(context()->mutex()); |
| 281 context()->detachReader(); | 266 context()->detachReader(); |
| 282 } | 267 } |
| 283 | 268 |
| 284 Result beginRead(const void** buffer, Flags, size_t* available) override | 269 Result beginRead(const void** buffer, Flags, size_t* available) override |
| 285 { | 270 { |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 366 break; | 351 break; |
| 367 m_dest1->enqueue(static_cast<const char*>(buffer), available); | 352 m_dest1->enqueue(static_cast<const char*>(buffer), available); |
| 368 m_dest2->enqueue(static_cast<const char*>(buffer), available); | 353 m_dest2->enqueue(static_cast<const char*>(buffer), available); |
| 369 m_reader->endRead(available); | 354 m_reader->endRead(available); |
| 370 } | 355 } |
| 371 m_dest1->setResult(r); | 356 m_dest1->setResult(r); |
| 372 m_dest2->setResult(r); | 357 m_dest2->setResult(r); |
| 373 stopInternal(); | 358 stopInternal(); |
| 374 } | 359 } |
| 375 | 360 |
| 361 WebTaskRunner* getTaskRunner() override |
| 362 { |
| 363 return TaskRunnerHelper::getUnthrottledTaskRunner(getExecutionContext())
; |
| 364 } |
| 365 |
| 376 void stop() override | 366 void stop() override |
| 377 { | 367 { |
| 378 stopInternal(); | 368 stopInternal(); |
| 379 ActiveDOMObject::stop(); | 369 ActiveDOMObject::stop(); |
| 380 } | 370 } |
| 381 | 371 |
| 382 DEFINE_INLINE_VIRTUAL_TRACE() | 372 DEFINE_INLINE_VIRTUAL_TRACE() |
| 383 { | 373 { |
| 384 ActiveDOMObject::trace(visitor); | 374 ActiveDOMObject::trace(visitor); |
| 385 } | 375 } |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 430 } | 420 } |
| 431 | 421 |
| 432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; | 422 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; |
| 433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); | 423 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat
aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); |
| 434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); | 424 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); |
| 435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); | 425 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); |
| 436 return; | 426 return; |
| 437 } | 427 } |
| 438 | 428 |
| 439 } // namespace blink | 429 } // namespace blink |
| OLD | NEW |