Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(358)

Side by Side Diff: Source/modules/fetch/FetchBlobDataConsumerHandle.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
7
8 #include "core/dom/ActiveDOMObject.h"
9 #include "core/dom/CrossThreadTask.h"
10 #include "core/dom/ExecutionContext.h"
11 #include "core/fetch/FetchInitiatorTypeNames.h"
12 #include "core/loader/ThreadableLoaderClient.h"
13 #include "modules/fetch/CompositeDataConsumerHandle.h"
14 #include "modules/fetch/DataConsumerHandleUtil.h"
15 #include "platform/Task.h"
16 #include "platform/blob/BlobRegistry.h"
17 #include "platform/blob/BlobURL.h"
18 #include "platform/network/ResourceRequest.h"
19 #include "public/platform/Platform.h"
20 #include "public/platform/WebTraceLocation.h"
21 #include "wtf/Locker.h"
22 #include "wtf/ThreadingPrimitives.h"
23
24 namespace blink {
25
26 using Result = FetchBlobDataConsumerHandle::Result;
27
28 namespace {
29
30 // CrossThreadHolder<T> provides cross-thread access to |obj| of class T.
31 // CrossThreadHolder<T> can be passed across threads
32 // while |obj|'s methods are called on the thread of |executionContext|
33 // via CrossThreadHolder<T>::postTask().
34 // |obj| is destructed on the thread of |executionContext|
35 // when |executionContext| is stopped or
36 // CrossThreadHolder is destructed (earlier of them).
37 template<typename T>
38 class CrossThreadHolder {
39 public:
40 static PassOwnPtr<CrossThreadHolder<T>> create(ExecutionContext* executionCo ntext, PassOwnPtr<T> obj)
41 {
42 ASSERT(executionContext->isContextThread());
43 return adoptPtr(new CrossThreadHolder(executionContext, obj));
44 }
45
46 // Can be called from any thread.
47 // Executes |task| with |obj| and |executionContext| on the thread of
48 // |executionContext|.
49 // NOTE: |task| might be silently ignored (i.e. not executed) and
50 // destructed (possibly on the calling thread or on the thread of
51 // |executionContext|) when |executionContext| is stopped.
52 void postTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> task)
53 {
54 m_peer->postTask(task);
55 }
56
57 ~CrossThreadHolder()
58 {
59 m_peer->clear();
60 }
61
62 private:
63 CrossThreadHolder(ExecutionContext* executionContext, PassOwnPtr<T> obj)
64 : m_peer(Peer::create())
65 {
66 m_peer->initialize(new Bridge(executionContext, m_peer, obj));
67 }
68
69 // Bridge and Peer have a reference cycle:
70 // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer| <-(RefPtr)- CTH
71 // | | <-CrossThreadPersistent- | |
72 // CTH: CrossThreadHolder<T>
73 // The reference from Peer to Bridge is protected by |Peer::m_mutex| and
74 // cleared when Peer::clear() is called, i.e.:
75 // [1] when |executionContext| is stopped:
76 // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer| <-(RefPtr)- CTH
77 // [2] or when CrossThreadHolder is destructed:
78 // T <-OwnPtr- |Bridge| ---------RefPtr--------> |Peer|
79 // in either case, Bridge is shortly garbage collected.
80
81 class Peer;
82
83 // All methods must be called on |executionContext|'s thread.
84 class Bridge
85 : public GarbageCollectedFinalized<Bridge>
86 , public ActiveDOMObject {
87 WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(Bridge);
88 public:
89 Bridge(ExecutionContext* executionContext, PassRefPtr<Peer> peer, PassOw nPtr<T> obj)
90 : ActiveDOMObject(executionContext)
91 , m_peer(peer)
92 , m_obj(obj)
93 {
94 suspendIfNeeded();
95 }
96
97 DEFINE_INLINE_TRACE()
98 {
99 ActiveDOMObject::trace(visitor);
100 }
101
102 T* getObject() const { return m_obj.get(); }
103
104 private:
105 // ActiveDOMObject
106 void stop() override
107 {
108 ASSERT(executionContext()->isContextThread());
109 m_obj.clear();
110 m_peer->clear();
111 }
112
113 RefPtr<Peer> m_peer;
114 OwnPtr<T> m_obj;
115 };
116
117 class Peer : public ThreadSafeRefCounted<Peer> {
118 public:
119 static PassRefPtr<Peer> create()
120 {
121 return adoptRef(new Peer());
122 }
123
124 // initialize() must be called only just after construction to create a
125 // reference cycle.
126 void initialize(Bridge* bridge)
127 {
128 m_bridge = bridge;
129 }
130
131 void clear()
132 {
133 MutexLocker locker(m_mutex);
134 m_bridge.clear();
135 }
136
137 void postTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> tas k)
138 {
139 MutexLocker locker(m_mutex);
140 if (!m_bridge) {
141 // The bridge has already disappeared.
142 return;
143 }
144 m_bridge->executionContext()->postTask(FROM_HERE, createCrossThreadT ask(&Peer::runTask, this, task));
145 }
146
147 private:
148 Peer() = default;
149
150 // Must be called on the thread of |executionContext|.
151 void runTask(PassOwnPtr<WTF::Function<void(T*, ExecutionContext*)>> task )
152 {
153 Bridge* bridge;
154 {
155 MutexLocker locker(m_mutex);
156 bridge = m_bridge.get();
157 if (!bridge) {
158 // The bridge has already disappeared.
159 return;
160 }
161 ASSERT(bridge->executionContext()->isContextThread());
162 }
163
164 // |bridge| can be set to nullptr by another thread after the
165 // mutex is unlocked, but we can still use |loaderContext| here
166 // because the LoaderContext belongs to the current thread
167 // (==the loader thread).
168 (*task)(bridge->getObject(), bridge->executionContext());
169 }
170
171 // |m_brige| is protected by |m_mutex|.
172 CrossThreadPersistent<Bridge> m_bridge;
173 Mutex m_mutex;
174 };
175
176 RefPtr<Peer> m_peer;
177 };
178
179 // LoaderContext is created and destructed on the same thread
180 // (call this thread loader thread).
181 // All methods must be called on the loader thread.
182 // CommonContext and LoaderContext have a reference cycle:
183 // CTH <-----(OwnPtr)----+
184 // | |
185 // (CrossThreadPersistent) |
186 // v |
187 // LC ----(RefPtr)----> CC <-- RC
188 // LC: LoaderContext
189 // CC: CommonContext
190 // RC: ReaderContext
191 // CTH: CrossThreadHolder<LoaderContext>
192 // This cycle is broken when CommonContext::clearLoaderContext() is called
193 // (which clears the reference to CrossThreadHolder),
194 // or ExecutionContext of the loader thread is stopped
195 // (CrossThreadHolder clears the reference to LoaderContext).
196
197 class LoaderContext {
198 public:
199 virtual ~LoaderContext() { }
200 virtual void start(ExecutionContext*) = 0;
201 };
202
203 class CommonContext
204 : public ThreadSafeRefCounted<CommonContext> {
205 public:
206 CommonContext()
207 : m_handle(CompositeDataConsumerHandle::create(createWaitingDataConsumer Handle())) { }
208
209 // initialize() must be called only just after construction to create a
210 // reference cycle.
211 void initialize(ExecutionContext* executionContext, PassOwnPtr<LoaderContext > loaderContext)
212 {
213 m_loaderContextHolder = CrossThreadHolder<LoaderContext>::create(executi onContext, loaderContext);
214 }
215
216 CompositeDataConsumerHandle* handle() { return m_handle.get(); }
217
218 void startLoader()
219 {
220 MutexLocker locker(m_mutex);
221 if (m_loaderContextHolder)
222 m_loaderContextHolder->postTask(threadSafeBind<LoaderContext*, Execu tionContext*>(&LoaderContext::start));
223 }
224
225 void clearLoaderContext()
226 {
227 MutexLocker locker(m_mutex);
228 m_loaderContextHolder.clear();
229 }
230
231 // Must be called on the loader thread.
232 void error()
233 {
234 clearLoaderContext();
235 handle()->update(createUnexpectedErrorDataConsumerHandle());
236 }
237
238 private:
239 OwnPtr<CompositeDataConsumerHandle> m_handle;
240
241 // |m_loaderContextHolder| is protected by |m_mutex|.
242 OwnPtr<CrossThreadHolder<LoaderContext>> m_loaderContextHolder;
243 Mutex m_mutex;
244 };
245
246 // All methods must be called on the loader thread.
247 class BlobLoaderContext final
248 : public LoaderContext
249 , public ThreadableLoaderClient {
250 public:
251 BlobLoaderContext(PassRefPtr<CommonContext> commonContext, PassRefPtr<BlobDa taHandle> blobDataHandle, FetchBlobDataConsumerHandle::LoaderFactory* loaderFact ory)
252 : m_commonContext(commonContext)
253 , m_blobDataHandle(blobDataHandle)
254 , m_loaderFactory(loaderFactory)
255 , m_receivedResponse(false) { }
256
257 ~BlobLoaderContext() override
258 {
259 if (m_loader && !m_receivedResponse)
260 m_commonContext->handle()->update(createUnexpectedErrorDataConsumerH andle());
261 if (m_loader) {
262 m_loader->cancel();
263 m_loader.clear();
264 }
265 }
266
267 void start(ExecutionContext* executionContext) override
268 {
269 ASSERT(executionContext->isContextThread());
270 ASSERT(!m_loader);
271
272 m_loader = createLoader(executionContext, this);
273 if (!m_loader)
274 m_commonContext->error();
275 }
276
277 private:
278 PassRefPtr<ThreadableLoader> createLoader(ExecutionContext* executionContext , ThreadableLoaderClient* client) const
279 {
280 KURL url = BlobURL::createPublicURL(executionContext->securityOrigin());
281 if (url.isEmpty()) {
282 return nullptr;
283 }
284 BlobRegistry::registerPublicBlobURL(executionContext->securityOrigin(), url, m_blobDataHandle);
285
286 ResourceRequest request(url);
287 request.setRequestContext(WebURLRequest::RequestContextInternal);
288 request.setUseStreamOnResponse(true);
289
290 ThreadableLoaderOptions options;
291 options.preflightPolicy = ConsiderPreflight;
292 options.crossOriginRequestPolicy = DenyCrossOriginRequests;
293 options.contentSecurityPolicyEnforcement = DoNotEnforceContentSecurityPo licy;
294 options.initiator = FetchInitiatorTypeNames::internal;
295
296 ResourceLoaderOptions resourceLoaderOptions;
297 resourceLoaderOptions.dataBufferingPolicy = DoNotBufferData;
298
299 return m_loaderFactory->create(*executionContext, client, request, optio ns, resourceLoaderOptions);
300 }
301
302 // ThreadableLoaderClient
303 void didReceiveResponse(unsigned long, const ResourceResponse&, PassOwnPtr<W ebDataConsumerHandle> handle) override
304 {
305 ASSERT(!m_receivedResponse);
306 m_receivedResponse = true;
307 if (!handle) {
308 // Here we assume WebURLLoader must return the response body as
309 // |WebDataConsumerHandle| since we call
310 // request.setUseStreamOnResponse().
311 m_commonContext->handle()->update(createUnexpectedErrorDataConsumerH andle());
312 return;
313 }
314 m_commonContext->handle()->update(handle);
315 }
316 void didFinishLoading(unsigned long, double) override
317 {
318 m_commonContext->clearLoaderContext();
319 }
320 void didFail(const ResourceError&) override
321 {
322 if (m_receivedResponse)
323 m_commonContext->clearLoaderContext();
324 else
325 m_commonContext->error();
326 }
327 void didFailRedirectCheck() override
328 {
329 // We don't expect redirects for Blob loading.
330 ASSERT_NOT_REACHED();
331 }
332
333 RefPtr<CommonContext> m_commonContext;
334 RefPtr<BlobDataHandle> m_blobDataHandle;
335 Persistent<FetchBlobDataConsumerHandle::LoaderFactory> m_loaderFactory;
336 RefPtr<ThreadableLoader> m_loader;
337
338 bool m_receivedResponse;
339 };
340
341 class DefaultLoaderFactory final : public FetchBlobDataConsumerHandle::LoaderFac tory {
342 public:
343 PassRefPtr<ThreadableLoader> create(
344 ExecutionContext& executionContext,
345 ThreadableLoaderClient* client,
346 const ResourceRequest& request,
347 const ThreadableLoaderOptions& options,
348 const ResourceLoaderOptions& resourceLoaderOptions) override
349 {
350 return ThreadableLoader::create(executionContext, client, request, optio ns, resourceLoaderOptions);
351 }
352 };
353
354 } // namespace
355
356 // ReaderContext is referenced from FetchBlobDataConsumerHandle and
357 // ReaderContext::ReaderImpl.
358 // All functions/members must be called/accessed only on the reader thread,
359 // except for constructor, destructor, obtainReader() and |m_commonContext|
360 // accessed from them.
361 class FetchBlobDataConsumerHandle::ReaderContext final : public ThreadSafeRefCou nted<ReaderContext> {
362 public:
363 class ReaderImpl : public FetchDataConsumerHandle::Reader {
364 public:
365 ReaderImpl(Client* client, PassRefPtr<ReaderContext> readerContext, Pass OwnPtr<WebDataConsumerHandle::Reader> reader)
366 : m_readerContext(readerContext)
367 , m_reader(reader)
368 , m_notifier(client) { }
369 ~ReaderImpl() override { }
370
371 Result read(void* data, size_t size, Flags flags, size_t* readSize) over ride
372 {
373 if (m_readerContext->drained())
374 return Done;
375 m_readerContext->ensureStartLoader();
376 Result r = m_reader->read(data, size, flags, readSize);
377 if (r != ShouldWait) {
378 // We read non-empty data, so we cannot use the blob data
379 // handle which represents the whole data.
380 m_readerContext->clearBlobDataHandleForDrain();
381 }
382 return r;
383 }
384
385 Result beginRead(const void** buffer, Flags flags, size_t* available) ov erride
386 {
387 if (m_readerContext->drained())
388 return Done;
389 m_readerContext->ensureStartLoader();
390 Result r = m_reader->beginRead(buffer, flags, available);
391 if (r != ShouldWait) {
392 // We read non-empty data, so we cannot use the blob data
393 // handle which represents the whole data.
394 m_readerContext->clearBlobDataHandleForDrain();
395 }
396 return r;
397 }
398
399 Result endRead(size_t readSize) override
400 {
401 return m_reader->endRead(readSize);
402 }
403
404 PassRefPtr<BlobDataHandle> drainAsBlobDataHandle() override
405 {
406 if (!m_readerContext->m_blobDataHandleForDrain || m_readerContext->m _blobDataHandleForDrain->size() == kuint64max)
407 return nullptr;
408 RefPtr<BlobDataHandle> blobDataHandle = m_readerContext->m_blobDataH andleForDrain;
409 m_readerContext->setDrained();
410 m_readerContext->clearBlobDataHandleForDrain();
411 return blobDataHandle.release();
412 }
413
414 private:
415 RefPtr<ReaderContext> m_readerContext;
416 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
417 NotifyOnReaderCreationHelper m_notifier;
418 };
419
420 ReaderContext(ExecutionContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle, FetchBlobDataConsumerHandle::LoaderFactory* loaderFactory)
421 : m_commonContext(adoptRef(new CommonContext()))
422 , m_loaderStarted(false)
423 , m_drained(false)
424 , m_blobDataHandleForDrain(blobDataHandle)
425 {
426 m_commonContext->initialize(executionContext, adoptPtr(new BlobLoaderCon text(m_commonContext, m_blobDataHandleForDrain, loaderFactory)));
427 }
428
429 ~ReaderContext()
430 {
431 m_commonContext->clearLoaderContext();
432 }
433
434 PassOwnPtr<FetchDataConsumerHandle::Reader> obtainReader(WebDataConsumerHand le::Client* client)
435 {
436 return adoptPtr(new ReaderImpl(client, this, m_commonContext->handle()-> obtainReader(client)));
437 }
438
439 private:
440 void ensureStartLoader()
441 {
442 if (m_loaderStarted)
443 return;
444 m_loaderStarted = true;
445 m_commonContext->startLoader();
446 }
447
448 void clearBlobDataHandleForDrain()
449 {
450 m_blobDataHandleForDrain.clear();
451 }
452
453 bool drained() const { return m_drained; }
454 void setDrained() { m_drained = true; }
455
456 RefPtr<CommonContext> m_commonContext;
457
458 bool m_loaderStarted;
459 bool m_drained;
460 RefPtr<BlobDataHandle> m_blobDataHandleForDrain;
461 };
462
463 FetchBlobDataConsumerHandle::FetchBlobDataConsumerHandle(ExecutionContext* execu tionContext, PassRefPtr<BlobDataHandle> blobDataHandle, LoaderFactory* loaderFac tory)
464 : m_readerContext(adoptRef(new ReaderContext(executionContext, blobDataHandl e, loaderFactory)))
465 {
466 }
467
468 FetchBlobDataConsumerHandle::~FetchBlobDataConsumerHandle()
469 {
470 }
471
472 PassOwnPtr<FetchDataConsumerHandle> FetchBlobDataConsumerHandle::create(Executio nContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle, LoaderFac tory* loaderFactory)
473 {
474 if (!blobDataHandle)
475 return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumer Handle());
476
477 return adoptPtr(new FetchBlobDataConsumerHandle(executionContext, blobDataHa ndle, loaderFactory));
478 }
479
480 PassOwnPtr<FetchDataConsumerHandle> FetchBlobDataConsumerHandle::create(Executio nContext* executionContext, PassRefPtr<BlobDataHandle> blobDataHandle)
481 {
482 if (!blobDataHandle)
483 return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumer Handle());
484
485 return adoptPtr(new FetchBlobDataConsumerHandle(executionContext, blobDataHa ndle, new DefaultLoaderFactory));
486 }
487
488 FetchDataConsumerHandle::Reader* FetchBlobDataConsumerHandle::obtainReaderIntern al(Client* client)
489 {
490 return m_readerContext->obtainReader(client).leakPtr();
491 }
492
493 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698