Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/DataConsumerTee.cpp

Issue 2277143002: Use BytesConsumer in BodyStreamBuffer (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@bytes-consumer-tee
Patch Set: rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "modules/fetch/DataConsumerTee.h"
6
7 #include "core/dom/ActiveDOMObject.h"
8 #include "core/dom/ExecutionContext.h"
9 #include "modules/fetch/DataConsumerHandleUtil.h"
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
11 #include "platform/CrossThreadFunctional.h"
12 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebTaskRunner.h"
15 #include "public/platform/WebThread.h"
16 #include "public/platform/WebTraceLocation.h"
17 #include "wtf/Deque.h"
18 #include "wtf/Functional.h"
19 #include "wtf/PtrUtil.h"
20 #include "wtf/ThreadSafeRefCounted.h"
21 #include "wtf/ThreadingPrimitives.h"
22 #include "wtf/Vector.h"
23 #include <memory>
24
25 namespace blink {
26
27 using Result = WebDataConsumerHandle::Result;
28 using Flags = WebDataConsumerHandle::Flags;
29
30 namespace {
31
32 // This file contains the "tee" implementation. There are several classes and
33 // their relationship is complicated, so let me describe here.
34 //
35 // Tee::create function creates two DestinationHandles (destinations) from one
36 // WebDataConsumerHandle (source). In fact, it uses a reader of the source
37 // handle.
38 //
39 // SourceContext reads data from the source reader and enques it to two
40 // destination contexts. Destination readers read data from its associated
41 // contexts. Here is an object graph.
42 //
43 // R: the root object
44 // SR: the source reader
45 // SC: the SourceContext
46 // DCn: nth DestinationContext
47 // DRn: nth DestinationReader
48 // DHn: nth DestinationHandle
49 // ---------
50 // (normal)
51 // ---> DC1 <--- DR1 / DH1
52 // |
53 // |
54 // SR <--SC <-> R
55 // |
56 // |
57 // ---> DC2 <--- DR2 / DH2
58 //
59 // ---------
60 //
61 // The root object (R) refers to the SourceContext, and is referred by many
62 // objects including the SourceContext. As the root object is a
63 // ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
64 // The root object only has "stop" function that breaks the reference cycle.
65 // It will be called when:
66 // - The source context finishes reading,
67 // - The source context gets errored while reading,
68 // - The execution context associated with the source context is stopped or
69 // - All destination handles and readers are gone.
70 //
71 // ---------
72 // (stopped)
73 // ---> DC1 <--- DR1 / DH1
74 // |
75 // |
76 // SR <--SC --> R
77 // |
78 // |
79 // ---> DC2 <--- DR2 / DH2
80 //
81 // -------
82 // When |stop| is called, no one has a strong reference to the source context
83 // and it will be collected.
84 //
85
86 class SourceContext;
87
88 class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
89 public:
90 static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObjec t()); }
91
92 void initialize(SourceContext* sourceContext)
93 {
94 m_sourceContext = sourceContext;
95 }
96
97 // This function can be called from any thread.
98 void stop()
99 {
100 m_sourceContext = nullptr;
101 }
102
103 private:
104 TeeRootObject() = default;
105
106 CrossThreadPersistent<SourceContext> m_sourceContext;
107 };
108
109 class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
110 public:
111 static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(std::move(root))); }
112 ~DestinationTracker()
113 {
114 m_root->stop();
115 }
116
117 private:
118 explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) { }
119
120 RefPtr<TeeRootObject> m_root;
121 };
122
123 class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
124 public:
125 class Proxy : public ThreadSafeRefCounted<Proxy> {
126 public:
127 static PassRefPtr<Proxy> create(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker)
128 {
129 return adoptRef(new Proxy(std::move(context), std::move(tracker)));
130 }
131 ~Proxy()
132 {
133 m_context->detach();
134 }
135
136 DestinationContext* context() { return m_context.get(); }
137
138 private:
139 Proxy(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTrac ker> tracker) : m_context(context), m_tracker(tracker) { }
140
141 RefPtr<DestinationContext> m_context;
142 RefPtr<DestinationTracker> m_tracker;
143 };
144
145 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina tionContext()); }
146
147 void enqueue(const char* buffer, size_t size)
148 {
149 bool needsNotification = false;
150 {
151 MutexLocker locker(m_mutex);
152 needsNotification = m_queue.isEmpty();
153 std::unique_ptr<Vector<char>> data = wrapUnique(new Vector<char>);
154 data->append(buffer, size);
155 m_queue.append(std::move(data));
156 }
157 if (needsNotification)
158 notify();
159 }
160
161 void setResult(Result r)
162 {
163 DCHECK(r != WebDataConsumerHandle::Ok);
164 DCHECK(r != WebDataConsumerHandle::ShouldWait);
165 {
166 MutexLocker locker(m_mutex);
167 if (m_result != WebDataConsumerHandle::ShouldWait) {
168 // The result was already set.
169 return;
170 }
171 m_result = r;
172 if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
173 m_queue.clear();
174 }
175 notify();
176 }
177
178 void notify()
179 {
180 {
181 MutexLocker locker(m_mutex);
182 if (!m_client) {
183 // No client is registered.
184 return;
185 }
186 DCHECK(m_readerThread);
187 if (!m_readerThread->isCurrentThread()) {
188 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, cr ossThreadBind(&DestinationContext::notify, wrapPassRefPtr(this)));
189 return;
190 }
191 }
192 // The reading thread is the current thread.
193 if (m_client)
194 m_client->didGetReadable();
195 }
196
197 Mutex& mutex() { return m_mutex; }
198
199 // The following functions don't use lock. They should be protected by the
200 // caller.
201 void attachReader(WebDataConsumerHandle::Client* client)
202 {
203 DCHECK(!m_readerThread);
204 DCHECK(!m_client);
205 m_readerThread = Platform::current()->currentThread();
206 m_client = client;
207 }
208 void detachReader()
209 {
210 DCHECK(m_readerThread && m_readerThread->isCurrentThread());
211 m_readerThread = nullptr;
212 m_client = nullptr;
213 }
214 const std::unique_ptr<Vector<char>>& top() const { return m_queue.first(); }
215 bool isEmpty() const { return m_queue.isEmpty(); }
216 size_t offset() const { return m_offset; }
217 void consume(size_t size)
218 {
219 const auto& top = m_queue.first();
220 DCHECK(m_offset <= m_offset + size);
221 DCHECK(m_offset + size <= top->size());
222 if (top->size() <= m_offset + size) {
223 m_offset = 0;
224 m_queue.removeFirst();
225 } else {
226 m_offset += size;
227 }
228 }
229 Result getResult() { return m_result; }
230
231 private:
232 DestinationContext()
233 : m_result(WebDataConsumerHandle::ShouldWait)
234 , m_readerThread(nullptr)
235 , m_client(nullptr)
236 , m_offset(0)
237 , m_isTwoPhaseReadInProgress(false)
238 {
239 }
240
241 void detach()
242 {
243 MutexLocker locker(m_mutex);
244 DCHECK(!m_client);
245 DCHECK(!m_readerThread);
246 m_queue.clear();
247 }
248
249 Result m_result;
250 Deque<std::unique_ptr<Vector<char>>> m_queue;
251 // Note: Holding a WebThread raw pointer is not generally safe, but we can
252 // do that in this case because:
253 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
254 // responsibility.
255 // 2. |m_readerThread| will never be used after the associated reader is
256 // detached.
257 WebThread* m_readerThread;
258 WebDataConsumerHandle::Client* m_client;
259 size_t m_offset;
260 bool m_isTwoPhaseReadInProgress;
261 Mutex m_mutex;
262 };
263
264 class DestinationReader final : public WebDataConsumerHandle::Reader {
265 public:
266 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client)
267 : m_contextProxy(contextProxy)
268 {
269 MutexLocker locker(context()->mutex());
270 context()->attachReader(client);
271 if (client) {
272 // We need to use crossThreadBind here to retain the context. Note
273 // |context()| return value is of type DestinationContext*, not
274 // PassRefPtr<DestinationContext>.
275 Platform::current()->currentThread()->getWebTaskRunner()->postTask(B LINK_FROM_HERE, crossThreadBind(&DestinationContext::notify, wrapPassRefPtr(cont ext())));
276 }
277 }
278 ~DestinationReader() override
279 {
280 MutexLocker locker(context()->mutex());
281 context()->detachReader();
282 }
283
284 Result beginRead(const void** buffer, Flags, size_t* available) override
285 {
286 MutexLocker locker(context()->mutex());
287 *available = 0;
288 *buffer = nullptr;
289 if (context()->isEmpty())
290 return context()->getResult();
291
292 const std::unique_ptr<Vector<char>>& chunk = context()->top();
293 *available = chunk->size() - context()->offset();
294 *buffer = chunk->data() + context()->offset();
295 return WebDataConsumerHandle::Ok;
296 }
297
298 Result endRead(size_t readSize) override
299 {
300 MutexLocker locker(context()->mutex());
301 if (context()->isEmpty())
302 return WebDataConsumerHandle::UnexpectedError;
303 context()->consume(readSize);
304 return WebDataConsumerHandle::Ok;
305 }
306
307 private:
308 DestinationContext* context() { return m_contextProxy->context(); }
309
310 RefPtr<DestinationContext::Proxy> m_contextProxy;
311 };
312
313 class DestinationHandle final : public WebDataConsumerHandle {
314 public:
315 static std::unique_ptr<WebDataConsumerHandle> create(PassRefPtr<DestinationC ontext::Proxy> contextProxy)
316 {
317 return wrapUnique(new DestinationHandle(std::move(contextProxy)));
318 }
319
320 std::unique_ptr<Reader> obtainReader(Client* client)
321 {
322 return wrapUnique(new DestinationReader(m_contextProxy, client));
323 }
324
325 private:
326 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { }
327 const char* debugName() const override { return "DestinationHandle"; }
328
329 RefPtr<DestinationContext::Proxy> m_contextProxy;
330 };
331
332 // Bound to the created thread.
333 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
334 USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
335 public:
336 SourceContext(
337 PassRefPtr<TeeRootObject> root,
338 std::unique_ptr<WebDataConsumerHandle> src,
339 PassRefPtr<DestinationContext> dest1,
340 PassRefPtr<DestinationContext> dest2,
341 ExecutionContext* executionContext)
342 : ActiveDOMObject(executionContext)
343 , m_root(root)
344 , m_reader(src->obtainReader(this))
345 , m_dest1(dest1)
346 , m_dest2(dest2)
347 {
348 suspendIfNeeded();
349 }
350 ~SourceContext() override
351 {
352 stopInternal();
353 }
354
355 void didGetReadable() override
356 {
357 DCHECK(m_reader);
358 Result r = WebDataConsumerHandle::Ok;
359 while (true) {
360 const void* buffer = nullptr;
361 size_t available = 0;
362 r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &a vailable);
363 if (r == WebDataConsumerHandle::ShouldWait)
364 return;
365 if (r != WebDataConsumerHandle::Ok)
366 break;
367 m_dest1->enqueue(static_cast<const char*>(buffer), available);
368 m_dest2->enqueue(static_cast<const char*>(buffer), available);
369 m_reader->endRead(available);
370 }
371 m_dest1->setResult(r);
372 m_dest2->setResult(r);
373 stopInternal();
374 }
375
376 void stop() override
377 {
378 stopInternal();
379 ActiveDOMObject::stop();
380 }
381
382 DEFINE_INLINE_VIRTUAL_TRACE()
383 {
384 ActiveDOMObject::trace(visitor);
385 }
386
387 private:
388 void stopInternal()
389 {
390 if (!m_root)
391 return;
392 // When we already set a result, this result setting will be ignored.
393 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
394 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
395 m_root->stop();
396 m_root = nullptr;
397 m_reader = nullptr;
398 m_dest1 = nullptr;
399 m_dest2 = nullptr;
400 }
401
402 RefPtr<TeeRootObject> m_root;
403 std::unique_ptr<WebDataConsumerHandle::Reader> m_reader;
404 RefPtr<DestinationContext> m_dest1;
405 RefPtr<DestinationContext> m_dest2;
406 };
407
408 } // namespace
409
410 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr <WebDataConsumerHandle> src, std::unique_ptr<WebDataConsumerHandle>* dest1, std: :unique_ptr<WebDataConsumerHandle>* dest2)
411 {
412 RefPtr<TeeRootObject> root = TeeRootObject::create();
413 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
414 RefPtr<DestinationContext> context1 = DestinationContext::create();
415 RefPtr<DestinationContext> context2 = DestinationContext::create();
416
417 root->initialize(new SourceContext(root, std::move(src), context1, context2, executionContext));
418
419 *dest1 = DestinationHandle::create(DestinationContext::Proxy::create(context 1, tracker));
420 *dest2 = DestinationHandle::create(DestinationContext::Proxy::create(context 2, tracker));
421 }
422
423 void DataConsumerTee::create(ExecutionContext* executionContext, std::unique_ptr <FetchDataConsumerHandle> src, std::unique_ptr<FetchDataConsumerHandle>* dest1, std::unique_ptr<FetchDataConsumerHandle>* dest2)
424 {
425 RefPtr<BlobDataHandle> blobDataHandle = src->obtainFetchDataReader(nullptr)- >drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::AllowBlobWithInvalidSize );
426 if (blobDataHandle) {
427 *dest1 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle);
428 *dest2 = FetchBlobDataConsumerHandle::create(executionContext, blobDataH andle);
429 return;
430 }
431
432 std::unique_ptr<WebDataConsumerHandle> webDest1, webDest2;
433 DataConsumerTee::create(executionContext, static_cast<std::unique_ptr<WebDat aConsumerHandle>>(std::move(src)), &webDest1, &webDest2);
434 *dest1 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest1));
435 *dest2 = createFetchDataConsumerHandleFromWebHandle(std::move(webDest2));
436 return;
437 }
438
439 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698