Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp

Issue 2177243002: Use per-frame TaskRunner instead of thread's default in DataConsumerHandle (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@data_consumer_handle_unique_ptr
Patch Set: test fix Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/DataConsumerTee.h" 5 #include "modules/fetch/DataConsumerTee.h"
6 6
7 #include "core/dom/ActiveDOMObject.h" 7 #include "core/dom/ActiveDOMObject.h"
8 #include "core/dom/ExecutionContext.h" 8 #include "core/dom/ExecutionContext.h"
9 #include "core/dom/TaskRunnerHelper.h"
9 #include "modules/fetch/DataConsumerHandleUtil.h" 10 #include "modules/fetch/DataConsumerHandleUtil.h"
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" 11 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
11 #include "platform/CrossThreadFunctional.h" 12 #include "platform/CrossThreadFunctional.h"
12 #include "platform/heap/Handle.h" 13 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h" 14 #include "public/platform/Platform.h"
14 #include "public/platform/WebTaskRunner.h" 15 #include "public/platform/WebTaskRunner.h"
15 #include "public/platform/WebThread.h" 16 #include "public/platform/WebThread.h"
16 #include "public/platform/WebTraceLocation.h" 17 #include "public/platform/WebTraceLocation.h"
17 #include "wtf/Deque.h" 18 #include "wtf/Deque.h"
18 #include "wtf/Functional.h" 19 #include "wtf/Functional.h"
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 } 177 }
177 178
178 void notify() 179 void notify()
179 { 180 {
180 { 181 {
181 MutexLocker locker(m_mutex); 182 MutexLocker locker(m_mutex);
182 if (!m_client) { 183 if (!m_client) {
183 // No client is registered. 184 // No client is registered.
184 return; 185 return;
185 } 186 }
186 DCHECK(m_readerThread); 187 DCHECK(m_readerTaskRunner);
187 if (!m_readerThread->isCurrentThread()) { 188 if (!m_readerTaskRunner->runsTasksOnCurrentThread()) {
188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this))); 189 m_readerTaskRunner->postTask(BLINK_FROM_HERE, crossThreadBind(&D estinationContext::notify, wrapPassRefPtr(this)));
189 return; 190 return;
190 } 191 }
191 } 192 }
192 // The reading thread is the current thread. 193 // The reading thread is the current thread.
193 if (m_client) 194 if (m_client)
194 m_client->didGetReadable(); 195 m_client->didGetReadable();
195 } 196 }
196 197
197 Mutex& mutex() { return m_mutex; } 198 Mutex& mutex() { return m_mutex; }
198 199
199 // The following functions don't use lock. They should be protected by the 200 // The following functions don't use lock. They should be protected by the
200 // caller. 201 // caller.
201 void attachReader(WebDataConsumerHandle::Client* client) 202 void attachReader(WebDataConsumerHandle::Client* client)
202 { 203 {
203 DCHECK(!m_readerThread); 204 DCHECK(!m_readerTaskRunner);
204 DCHECK(!m_client); 205 DCHECK(!m_client);
205 m_readerThread = Platform::current()->currentThread(); 206 m_readerTaskRunner = client->getTaskRunner()->clone();
206 m_client = client; 207 m_client = client;
207 } 208 }
208 void detachReader() 209 void detachReader()
209 { 210 {
210 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); 211 DCHECK(m_readerTaskRunner && m_readerTaskRunner->runsTasksOnCurrentThrea d());
211 m_readerThread = nullptr; 212 m_readerTaskRunner = nullptr;
212 m_client = nullptr; 213 m_client = nullptr;
213 } 214 }
214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } 215 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); }
215 bool isEmpty() const { return m_queue.isEmpty(); } 216 bool isEmpty() const { return m_queue.isEmpty(); }
216 size_t offset() const { return m_offset; } 217 size_t offset() const { return m_offset; }
217 void consume(size_t size) 218 void consume(size_t size)
218 { 219 {
219 const auto& top = m_queue.first(); 220 const auto& top = m_queue.first();
220 DCHECK(m_offset <= m_offset + size); 221 DCHECK(m_offset <= m_offset + size);
221 DCHECK(m_offset + size <= top->size()); 222 DCHECK(m_offset + size <= top->size());
222 if (top->size() <= m_offset + size) { 223 if (top->size() <= m_offset + size) {
223 m_offset = 0; 224 m_offset = 0;
224 m_queue.removeFirst(); 225 m_queue.removeFirst();
225 } else { 226 } else {
226 m_offset += size; 227 m_offset += size;
227 } 228 }
228 } 229 }
229 Result getResult() { return m_result; } 230 Result getResult() { return m_result; }
230 231
231 private: 232 private:
232 DestinationContext() 233 DestinationContext()
233 : m_result(WebDataConsumerHandle::ShouldWait) 234 : m_result(WebDataConsumerHandle::ShouldWait)
234 , m_readerThread(nullptr) 235 , m_readerTaskRunner(nullptr)
235 , m_client(nullptr) 236 , m_client(nullptr)
236 , m_offset(0) 237 , m_offset(0)
237 , m_isTwoPhaseReadInProgress(false) 238 , m_isTwoPhaseReadInProgress(false)
238 { 239 {
239 } 240 }
240 241
241 void detach() 242 void detach()
242 { 243 {
243 MutexLocker locker(m_mutex); 244 MutexLocker locker(m_mutex);
244 DCHECK(!m_client); 245 DCHECK(!m_client);
245 DCHECK(!m_readerThread); 246 DCHECK(!m_readerTaskRunner);
246 m_queue.clear(); 247 m_queue.clear();
247 } 248 }
248 249
249 Result m_result; 250 Result m_result;
250 Deque<std::unique_ptr<Vector<char>>> m_queue; 251 Deque<std::unique_ptr<Vector<char>>> m_queue;
251 // Note: Holding a WebThread raw pointer is not generally safe, but we can 252 std::unique_ptr<WebTaskRunner> m_readerTaskRunner;
haraken 2016/07/28 09:19:30 Instead of holding a task runner, can we probably
tzik 2016/07/28 14:45:34 Done.
252 // do that in this case because:
253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
254 // responsibility.
255 // 2. |m_readerThread| will never be used after the associated reader is
256 // detached.
257 WebThread* m_readerThread;
258 WebDataConsumerHandle::Client* m_client; 253 WebDataConsumerHandle::Client* m_client;
259 size_t m_offset; 254 size_t m_offset;
260 bool m_isTwoPhaseReadInProgress; 255 bool m_isTwoPhaseReadInProgress;
261 Mutex m_mutex; 256 Mutex m_mutex;
262 }; 257 };
263 258
264 class DestinationReader final : public WebDataConsumerHandle::Reader { 259 class DestinationReader final : public WebDataConsumerHandle::Reader {
265 public: 260 public:
266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client) 261 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client)
267 : m_contextProxy(contextProxy) 262 : m_contextProxy(contextProxy)
268 { 263 {
269 MutexLocker locker(context()->mutex()); 264 MutexLocker locker(context()->mutex());
270 context()->attachReader(client); 265 context()->attachReader(client);
271 if (client) { 266 if (client) {
272 // We need to use crossThreadBind here to retain the context. Note 267 client->getTaskRunner()->postTask(BLINK_FROM_HERE, bind(&Destination Context::notify, wrapPassRefPtr(context())));
273 // |context()| return value is of type DestinationContext*, not
274 // PassRefPtr<DestinationContext>.
275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont ext())));
276 } 268 }
277 } 269 }
278 ~DestinationReader() override 270 ~DestinationReader() override
279 { 271 {
280 MutexLocker locker(context()->mutex()); 272 MutexLocker locker(context()->mutex());
281 context()->detachReader(); 273 context()->detachReader();
282 } 274 }
283 275
284 Result beginRead(const void** buffer, Flags, size_t* available) override 276 Result beginRead(const void** buffer, Flags, size_t* available) override
285 { 277 {
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
366 break; 358 break;
367 m_dest1->enqueue(static_cast<const char*>(buffer), available); 359 m_dest1->enqueue(static_cast<const char*>(buffer), available);
368 m_dest2->enqueue(static_cast<const char*>(buffer), available); 360 m_dest2->enqueue(static_cast<const char*>(buffer), available);
369 m_reader->endRead(available); 361 m_reader->endRead(available);
370 } 362 }
371 m_dest1->setResult(r); 363 m_dest1->setResult(r);
372 m_dest2->setResult(r); 364 m_dest2->setResult(r);
373 stopInternal(); 365 stopInternal();
374 } 366 }
375 367
368 WebTaskRunner* getTaskRunner() override
369 {
370 return TaskRunnerHelper::getUnthrottledTaskRunner(getExecutionContext()) ;
371 }
372
376 void stop() override 373 void stop() override
377 { 374 {
378 stopInternal(); 375 stopInternal();
379 ActiveDOMObject::stop(); 376 ActiveDOMObject::stop();
380 } 377 }
381 378
382 DEFINE_INLINE_VIRTUAL_TRACE() 379 DEFINE_INLINE_VIRTUAL_TRACE()
383 { 380 {
384 ActiveDOMObject::trace(visitor); 381 ActiveDOMObject::trace(visitor);
385 } 382 }
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
430 } 427 }
431 428
432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; 429 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2;
433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); 430 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2);
434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); 431 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1));
435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); 432 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2));
436 return; 433 return;
437 } 434 }
438 435
439 } // namespace blink 436 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698