Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp

Issue 2177243002: Use per-frame TaskRunner instead of thread's default in DataConsumerHandle (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@data_consumer_handle_unique_ptr
Patch Set: update Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/DataConsumerTee.h" 5 #include "modules/fetch/DataConsumerTee.h"
6 6
7 #include "core/dom/ActiveDOMObject.h" 7 #include "core/dom/ActiveDOMObject.h"
8 #include "core/dom/ExecutionContext.h" 8 #include "core/dom/ExecutionContext.h"
9 #include "core/dom/TaskRunnerHelper.h"
9 #include "modules/fetch/DataConsumerHandleUtil.h" 10 #include "modules/fetch/DataConsumerHandleUtil.h"
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h" 11 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
11 #include "platform/CrossThreadFunctional.h" 12 #include "platform/CrossThreadFunctional.h"
12 #include "platform/heap/Handle.h" 13 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h" 14 #include "public/platform/Platform.h"
14 #include "public/platform/WebTaskRunner.h" 15 #include "public/platform/WebTaskRunner.h"
15 #include "public/platform/WebThread.h" 16 #include "public/platform/WebThread.h"
16 #include "public/platform/WebTraceLocation.h" 17 #include "public/platform/WebTraceLocation.h"
17 #include "wtf/Deque.h" 18 #include "wtf/Deque.h"
18 #include "wtf/Functional.h" 19 #include "wtf/Functional.h"
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 } 177 }
177 178
178 void notify() 179 void notify()
179 { 180 {
180 { 181 {
181 MutexLocker locker(m_mutex); 182 MutexLocker locker(m_mutex);
182 if (!m_client) { 183 if (!m_client) {
183 // No client is registered. 184 // No client is registered.
184 return; 185 return;
185 } 186 }
186 DCHECK(m_readerThread); 187 DCHECK(m_readerTaskRunner);
187 if (!m_readerThread->isCurrentThread()) { 188 if (!m_readerTaskRunner->runsTasksOnCurrentThread()) {
188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this))); 189 m_readerTaskRunner->postTask(BLINK_FROM_HERE, crossThreadBind(&D estinationContext::notify, wrapPassRefPtr(this)));
189 return; 190 return;
190 } 191 }
191 } 192 }
192 // The reading thread is the current thread. 193 // The reading thread is the current thread.
193 if (m_client) 194 if (m_client)
194 m_client->didGetReadable(); 195 m_client->didGetReadable();
195 } 196 }
196 197
197 Mutex& mutex() { return m_mutex; } 198 Mutex& mutex() { return m_mutex; }
198 199
199 // The following functions don't use lock. They should be protected by the 200 // The following functions don't use lock. They should be protected by the
200 // caller. 201 // caller.
201 void attachReader(WebDataConsumerHandle::Client* client) 202 void attachReader(WebDataConsumerHandle::Client* client, std::unique_ptr<Web TaskRunner> readerTaskRunner)
202 { 203 {
203 DCHECK(!m_readerThread); 204 DCHECK(!m_readerTaskRunner);
204 DCHECK(!m_client); 205 DCHECK(!m_client);
205 m_readerThread = Platform::current()->currentThread(); 206 m_readerTaskRunner = std::move(readerTaskRunner);
206 m_client = client; 207 m_client = client;
207 } 208 }
208 void detachReader() 209 void detachReader()
209 { 210 {
210 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); 211 DCHECK(m_readerTaskRunner && m_readerTaskRunner->runsTasksOnCurrentThrea d());
211 m_readerThread = nullptr; 212 m_readerTaskRunner = nullptr;
212 m_client = nullptr; 213 m_client = nullptr;
213 } 214 }
214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); } 215 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); }
215 bool isEmpty() const { return m_queue.isEmpty(); } 216 bool isEmpty() const { return m_queue.isEmpty(); }
216 size_t offset() const { return m_offset; } 217 size_t offset() const { return m_offset; }
217 void consume(size_t size) 218 void consume(size_t size)
218 { 219 {
219 const auto& top = m_queue.first(); 220 const auto& top = m_queue.first();
220 DCHECK(m_offset <= m_offset + size); 221 DCHECK(m_offset <= m_offset + size);
221 DCHECK(m_offset + size <= top->size()); 222 DCHECK(m_offset + size <= top->size());
222 if (top->size() <= m_offset + size) { 223 if (top->size() <= m_offset + size) {
223 m_offset = 0; 224 m_offset = 0;
224 m_queue.removeFirst(); 225 m_queue.removeFirst();
225 } else { 226 } else {
226 m_offset += size; 227 m_offset += size;
227 } 228 }
228 } 229 }
229 Result getResult() { return m_result; } 230 Result getResult() { return m_result; }
230 231
231 private: 232 private:
232 DestinationContext() 233 DestinationContext()
233 : m_result(WebDataConsumerHandle::ShouldWait) 234 : m_result(WebDataConsumerHandle::ShouldWait)
234 , m_readerThread(nullptr) 235 , m_readerTaskRunner(nullptr)
235 , m_client(nullptr) 236 , m_client(nullptr)
236 , m_offset(0) 237 , m_offset(0)
237 , m_isTwoPhaseReadInProgress(false) 238 , m_isTwoPhaseReadInProgress(false)
238 { 239 {
239 } 240 }
240 241
241 void detach() 242 void detach()
242 { 243 {
243 MutexLocker locker(m_mutex); 244 MutexLocker locker(m_mutex);
244 DCHECK(!m_client); 245 DCHECK(!m_client);
245 DCHECK(!m_readerThread); 246 DCHECK(!m_readerTaskRunner);
246 m_queue.clear(); 247 m_queue.clear();
247 } 248 }
248 249
249 Result m_result; 250 Result m_result;
250 Deque<std::unique_ptr<Vector<char>>> m_queue; 251 Deque<std::unique_ptr<Vector<char>>> m_queue;
251 // Note: Holding a WebThread raw pointer is not generally safe, but we can 252 std::unique_ptr<WebTaskRunner> m_readerTaskRunner;
252 // do that in this case because:
253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
254 // responsibility.
255 // 2. |m_readerThread| will never be used after the associated reader is
256 // detached.
257 WebThread* m_readerThread;
258 WebDataConsumerHandle::Client* m_client; 253 WebDataConsumerHandle::Client* m_client;
259 size_t m_offset; 254 size_t m_offset;
260 bool m_isTwoPhaseReadInProgress; 255 bool m_isTwoPhaseReadInProgress;
261 Mutex m_mutex; 256 Mutex m_mutex;
262 }; 257 };
263 258
264 class DestinationReader final : public WebDataConsumerHandle::Reader { 259 class DestinationReader final : public WebDataConsumerHandle::Reader {
265 public: 260 public:
266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client) 261 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client, std::unique_ptr<WebTaskRunner> readerTaskRunner )
267 : m_contextProxy(contextProxy) 262 : m_contextProxy(contextProxy)
268 { 263 {
269 MutexLocker locker(context()->mutex()); 264 MutexLocker locker(context()->mutex());
270 context()->attachReader(client); 265 context()->attachReader(client, readerTaskRunner->clone());
271 if (client) { 266 if (client) {
272 // We need to use crossThreadBind here to retain the context. Note 267 readerTaskRunner->postTask(BLINK_FROM_HERE, bind(&DestinationContext ::notify, wrapPassRefPtr(context())));
273 // |context()| return value is of type DestinationContext*, not
274 // PassRefPtr<DestinationContext>.
275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont ext())));
276 } 268 }
277 } 269 }
278 ~DestinationReader() override 270 ~DestinationReader() override
279 { 271 {
280 MutexLocker locker(context()->mutex()); 272 MutexLocker locker(context()->mutex());
281 context()->detachReader(); 273 context()->detachReader();
282 } 274 }
283 275
284 Result beginRead(const void** buffer, Flags, size_t* available) override 276 Result beginRead(const void** buffer, Flags, size_t* available) override
285 { 277 {
(...skipping 24 matching lines...) Expand all
310 RefPtr<DestinationContext::Proxy> m_contextProxy; 302 RefPtr<DestinationContext::Proxy> m_contextProxy;
311 }; 303 };
312 304
313 class DestinationHandle final : public WebDataConsumerHandle { 305 class DestinationHandle final : public WebDataConsumerHandle {
314 public: 306 public:
315 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC ontext::Proxy> contextProxy) 307 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC ontext::Proxy> contextProxy)
316 { 308 {
317 return wrapUnique(new DestinationHandle(contextProxy)); 309 return wrapUnique(new DestinationHandle(contextProxy));
318 } 310 }
319 311
320 std::unique_ptr<Reader> obtainReader(Client* client) 312 std::unique_ptr<Reader> obtainReader(Client* client, std::unique_ptr<WebTask Runner> readerTaskRunner)
321 { 313 {
322 return wrapUnique(new DestinationReader(m_contextProxy, client)); 314 return wrapUnique(new DestinationReader(m_contextProxy, client, std::mov e(readerTaskRunner)));
323 } 315 }
324 316
325 private: 317 private:
326 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { } 318 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { }
327 const char* debugName() const override { return "DestinationHandle"; } 319 const char* debugName() const override { return "DestinationHandle"; }
328 320
329 RefPtr<DestinationContext::Proxy> m_contextProxy; 321 RefPtr<DestinationContext::Proxy> m_contextProxy;
330 }; 322 };
331 323
332 // Bound to the created thread. 324 // Bound to the created thread.
333 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client { 325 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
334 USING_GARBAGE_COLLECTED_MIXIN(SourceContext); 326 USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
335 public: 327 public:
336 SourceContext( 328 SourceContext(
337 PassRefPtr<TeeRootObject> root, 329 PassRefPtr<TeeRootObject> root,
338 std::unique_ptr<WebDataConsumerHandle> src, 330 std::unique_ptr<WebDataConsumerHandle> src,
339 PassRefPtr<DestinationContext> dest1, 331 PassRefPtr<DestinationContext> dest1,
340 PassRefPtr<DestinationContext> dest2, 332 PassRefPtr<DestinationContext> dest2,
341 ExecutionContext* executionContext) 333 ExecutionContext* executionContext)
342 : ActiveDOMObject(executionContext) 334 : ActiveDOMObject(executionContext)
343 , m_root(root) 335 , m_root(root)
344 , m_reader(src->obtainReader(this)) 336 , m_reader(src->obtainReader(this, TaskRunnerHelper::getUnthrottledTaskR unner(executionContext)->clone()))
345 , m_dest1(dest1) 337 , m_dest1(dest1)
346 , m_dest2(dest2) 338 , m_dest2(dest2)
347 { 339 {
348 suspendIfNeeded(); 340 suspendIfNeeded();
349 } 341 }
350 ~SourceContext() override 342 ~SourceContext() override
351 { 343 {
352 stopInternal(); 344 stopInternal();
353 } 345 }
354 346
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
415 RefPtr<DestinationContext> context2 = DestinationContext::create(); 407 RefPtr<DestinationContext> context2 = DestinationContext::create();
416 408
417 root->initialize(new SourceContext(root, std::move(src), context1, context2, executionContext)); 409 root->initialize(new SourceContext(root, std::move(src), context1, context2, executionContext));
418 410
419 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context 1, tracker)); 411 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context 1, tracker));
420 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context 2, tracker)); 412 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context 2, tracker));
421 } 413 }
422 414
423 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr <FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1, std::unique_ptr<FetchDataConsumerHandle>* dest2) 415 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr <FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1, std::unique_ptr<FetchDataConsumerHandle>* dest2)
424 { 416 {
425 RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr)- >drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize ); 417 std::unique_ptr<WebTaskRunner> readerTaskRunner = TaskRunnerHelper::getUnthr ottledTaskRunner(executionContext)->clone();
418 RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr, std::move(readerTaskRunner))->drainAsBlobDataHandle(FetchDataConsumerHandle::Rea der::AllowBlobWithInvalidSize);
426 if (blobDataHandle) { 419 if (blobDataHandle) {
427 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle); 420 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle);
428 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle); 421 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle);
429 return; 422 return;
430 } 423 }
431 424
432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2; 425 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2;
433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2); 426 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2);
434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1)); 427 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1));
435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2)); 428 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2));
436 return; 429 return;
437 } 430 }
438 431
439 } // namespace blink 432 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698