Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "modules/fetch/DataConsumerTee.h" | 5 #include "modules/fetch/DataConsumerTee.h" |
| 6 | 6 |
| 7 #include "core/dom/ActiveDOMObject.h" | 7 #include "core/dom/ActiveDOMObject.h" |
| 8 #include "core/dom/ExecutionContext.h" | 8 #include "core/dom/ExecutionContext.h" |
| 9 #include "core/dom/TaskRunnerHelper.h" | |
| 9 #include "modules/fetch/DataConsumerHandleUtil.h" | 10 #include "modules/fetch/DataConsumerHandleUtil.h" |
| 10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" | 11 #include "modules/fetch/FetchBlobDataConsumerHandle.h" |
| 11 #include "platform/CrossThreadFunctional.h" | 12 #include "platform/CrossThreadFunctional.h" |
| 12 #include "platform/heap/Handle.h" | 13 #include "platform/heap/Handle.h" |
| 13 #include "public/platform/Platform.h" | 14 #include "public/platform/Platform.h" |
| 14 #include "public/platform/WebTaskRunner.h" | 15 #include "public/platform/WebTaskRunner.h" |
| 15 #include "public/platform/WebThread.h" | 16 #include "public/platform/WebThread.h" |
| 16 #include "public/platform/WebTraceLocation.h" | 17 #include "public/platform/WebTraceLocation.h" |
| 17 #include "wtf/Deque.h" | 18 #include "wtf/Deque.h" |
| 18 #include "wtf/Functional.h" | 19 #include "wtf/Functional.h" |
| (...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 176 } | 177 } |
| 177 | 178 |
| 178 void notify() | 179 void notify() |
| 179 { | 180 { |
| 180 { | 181 { |
| 181 MutexLocker locker(m_mutex); | 182 MutexLocker locker(m_mutex); |
| 182 if (!m_client) { | 183 if (!m_client) { |
| 183 // No client is registered. | 184 // No client is registered. |
| 184 return; | 185 return; |
| 185 } | 186 } |
| 186 DCHECK(m_readerThread); | 187 DCHECK(m_readerTaskRunner); |
| 187 if (!m_readerThread->isCurrentThread()) { | 188 if (!m_readerTaskRunner->runsTasksOnCurrentThread()) { |
| 188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this))); | 189 m_readerTaskRunner->postTask(BLINK_FROM_HERE, crossThreadBind(&D estinationContext::notify, wrapPassRefPtr(this))); |
| 189 return; | 190 return; |
| 190 } | 191 } |
| 191 } | 192 } |
| 192 // The reading thread is the current thread. | 193 // The reading thread is the current thread. |
| 193 if (m_client) | 194 if (m_client) |
| 194 m_client->didGetReadable(); | 195 m_client->didGetReadable(); |
| 195 } | 196 } |
| 196 | 197 |
| 197 Mutex& mutex() { return m_mutex; } | 198 Mutex& mutex() { return m_mutex; } |
| 198 | 199 |
| 199 // The following functions don't use lock. They should be protected by the | 200 // The following functions don't use lock. They should be protected by the |
| 200 // caller. | 201 // caller. |
| 201 void attachReader(WebDataConsumerHandle::Client* client) | 202 void attachReader(WebDataConsumerHandle::Client* client) |
| 202 { | 203 { |
| 203 DCHECK(!m_readerThread); | 204 DCHECK(!m_readerTaskRunner); |
| 204 DCHECK(!m_client); | 205 DCHECK(!m_client); |
| 205 m_readerThread = Platform::current()->currentThread(); | 206 m_readerTaskRunner = client->getTaskRunner()->clone(); |
| 206 m_client = client; | 207 m_client = client; |
| 207 } | 208 } |
| 208 void detachReader() | 209 void detachReader() |
| 209 { | 210 { |
| 210 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | 211 DCHECK(m_readerTaskRunner && m_readerTaskRunner->runsTasksOnCurrentThrea d()); |
| 211 m_readerThread = nullptr; | 212 m_readerTaskRunner = nullptr; |
| 212 m_client = nullptr; | 213 m_client = nullptr; |
| 213 } | 214 } |
| 214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } | 215 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } |
| 215 bool isEmpty() const { return m_queue.isEmpty(); } | 216 bool isEmpty() const { return m_queue.isEmpty(); } |
| 216 size_t offset() const { return m_offset; } | 217 size_t offset() const { return m_offset; } |
| 217 void consume(size_t size) | 218 void consume(size_t size) |
| 218 { | 219 { |
| 219 const auto& top = m_queue.first(); | 220 const auto& top = m_queue.first(); |
| 220 DCHECK(m_offset <= m_offset + size); | 221 DCHECK(m_offset <= m_offset + size); |
| 221 DCHECK(m_offset + size <= top->size()); | 222 DCHECK(m_offset + size <= top->size()); |
| 222 if (top->size() <= m_offset + size) { | 223 if (top->size() <= m_offset + size) { |
| 223 m_offset = 0; | 224 m_offset = 0; |
| 224 m_queue.removeFirst(); | 225 m_queue.removeFirst(); |
| 225 } else { | 226 } else { |
| 226 m_offset += size; | 227 m_offset += size; |
| 227 } | 228 } |
| 228 } | 229 } |
| 229 Result getResult() { return m_result; } | 230 Result getResult() { return m_result; } |
| 230 | 231 |
| 231 private: | 232 private: |
| 232 DestinationContext() | 233 DestinationContext() |
| 233 : m_result(WebDataConsumerHandle::ShouldWait) | 234 : m_result(WebDataConsumerHandle::ShouldWait) |
| 234 , m_readerThread(nullptr) | 235 , m_readerTaskRunner(nullptr) |
| 235 , m_client(nullptr) | 236 , m_client(nullptr) |
| 236 , m_offset(0) | 237 , m_offset(0) |
| 237 , m_isTwoPhaseReadInProgress(false) | 238 , m_isTwoPhaseReadInProgress(false) |
| 238 { | 239 { |
| 239 } | 240 } |
| 240 | 241 |
| 241 void detach() | 242 void detach() |
| 242 { | 243 { |
| 243 MutexLocker locker(m_mutex); | 244 MutexLocker locker(m_mutex); |
| 244 DCHECK(!m_client); | 245 DCHECK(!m_client); |
| 245 DCHECK(!m_readerThread); | 246 DCHECK(!m_readerTaskRunner); |
| 246 m_queue.clear(); | 247 m_queue.clear(); |
| 247 } | 248 } |
| 248 | 249 |
| 249 Result m_result; | 250 Result m_result; |
| 250 Deque<std::unique_ptr<Vector<char>>> m_queue; | 251 Deque<std::unique_ptr<Vector<char>>> m_queue; |
| 251 // Note: Holding a WebThread raw pointer is not generally safe, but we can | 252 std::unique_ptr<WebTaskRunner> m_readerTaskRunner; |
|
haraken
2016/07/28 09:19:30
Instead of holding a task runner, can we probably
tzik
2016/07/28 14:45:34
Done.
| |
| 252 // do that in this case because: | |
| 253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | |
| 254 // responsibility. | |
| 255 // 2. |m_readerThread| will never be used after the associated reader is | |
| 256 // detached. | |
| 257 WebThread* m_readerThread; | |
| 258 WebDataConsumerHandle::Client* m_client; | 253 WebDataConsumerHandle::Client* m_client; |
| 259 size_t m_offset; | 254 size_t m_offset; |
| 260 bool m_isTwoPhaseReadInProgress; | 255 bool m_isTwoPhaseReadInProgress; |
| 261 Mutex m_mutex; | 256 Mutex m_mutex; |
| 262 }; | 257 }; |
| 263 | 258 |
| 264 class DestinationReader final : public WebDataConsumerHandle::Reader { | 259 class DestinationReader final : public WebDataConsumerHandle::Reader { |
| 265 public: | 260 public: |
| 266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client) | 261 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client) |
| 267 : m_contextProxy(contextProxy) | 262 : m_contextProxy(contextProxy) |
| 268 { | 263 { |
| 269 MutexLocker locker(context()->mutex()); | 264 MutexLocker locker(context()->mutex()); |
| 270 context()->attachReader(client); | 265 context()->attachReader(client); |
| 271 if (client) { | 266 if (client) { |
| 272 // We need to use crossThreadBind here to retain the context. Note | 267 client->getTaskRunner()->postTask(BLINK_FROM_HERE, bind(&Destination Context::notify, wrapPassRefPtr(context()))); |
| 273 // |context()| return value is of type DestinationContext*, not | |
| 274 // PassRefPtr<DestinationContext>. | |
| 275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont ext()))); | |
| 276 } | 268 } |
| 277 } | 269 } |
| 278 ~DestinationReader() override | 270 ~DestinationReader() override |
| 279 { | 271 { |
| 280 MutexLocker locker(context()->mutex()); | 272 MutexLocker locker(context()->mutex()); |
| 281 context()->detachReader(); | 273 context()->detachReader(); |
| 282 } | 274 } |
| 283 | 275 |
| 284 Result beginRead(const void** buffer, Flags, size_t* available) override | 276 Result beginRead(const void** buffer, Flags, size_t* available) override |
| 285 { | 277 { |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 366 break; | 358 break; |
| 367 m_dest1->enqueue(static_cast<const char*>(buffer), available); | 359 m_dest1->enqueue(static_cast<const char*>(buffer), available); |
| 368 m_dest2->enqueue(static_cast<const char*>(buffer), available); | 360 m_dest2->enqueue(static_cast<const char*>(buffer), available); |
| 369 m_reader->endRead(available); | 361 m_reader->endRead(available); |
| 370 } | 362 } |
| 371 m_dest1->setResult(r); | 363 m_dest1->setResult(r); |
| 372 m_dest2->setResult(r); | 364 m_dest2->setResult(r); |
| 373 stopInternal(); | 365 stopInternal(); |
| 374 } | 366 } |
| 375 | 367 |
| 368 WebTaskRunner* getTaskRunner() override | |
| 369 { | |
| 370 return TaskRunnerHelper::getUnthrottledTaskRunner(getExecutionContext()) ; | |
| 371 } | |
| 372 | |
| 376 void stop() override | 373 void stop() override |
| 377 { | 374 { |
| 378 stopInternal(); | 375 stopInternal(); |
| 379 ActiveDOMObject::stop(); | 376 ActiveDOMObject::stop(); |
| 380 } | 377 } |
| 381 | 378 |
| 382 DEFINE_INLINE_VIRTUAL_TRACE() | 379 DEFINE_INLINE_VIRTUAL_TRACE() |
| 383 { | 380 { |
| 384 ActiveDOMObject::trace(visitor); | 381 ActiveDOMObject::trace(visitor); |
| 385 } | 382 } |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 430 } | 427 } |
| 431 | 428 |
| 432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; | 429 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; |
| 433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); | 430 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); |
| 434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); | 431 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); |
| 435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); | 432 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); |
| 436 return; | 433 return; |
| 437 } | 434 } |
| 438 | 435 |
| 439 } // namespace blink | 436 } // namespace blink |
| OLD | NEW |