Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(910)

Side by Side Diff: Source/modules/fetch/DataConsumerTee.cpp

Issue 1195563002: Introduce DataConsumerHandleTee (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/dom/ActiveDOMObject.h"
9 #include "core/dom/ExecutionContext.h"
10 #include "platform/Task.h"
11 #include "platform/ThreadSafeFunctional.h"
12 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebThread.h"
15 #include "public/platform/WebTraceLocation.h"
16 #include "wtf/Deque.h"
17 #include "wtf/Functional.h"
18 #include "wtf/ThreadSafeRefCounted.h"
19 #include "wtf/ThreadingPrimitives.h"
20 #include "wtf/Vector.h"
21
22 namespace blink {
23
24 using Result = WebDataConsumerHandle::Result;
25 using Flags = WebDataConsumerHandle::Flags;
26
27 namespace {
28
29 // This file contains the "tee" implementation. There are several classes and
30 // their relationship is complicated, so let me describe here.
31 //
32 // Tee::create function creates two DestinationHandles (destinations) from one
33 // WebDataConsumerHandle (source). In fact, it uses a reader of the source
34 // handle.
35 //
36 // SourceContext reads data from the source reader and enques it to two
37 // destination contexts. Destination readers read data from its associated
38 // contexts. Here is an object graph.
39 //
40 // R: the root object
41 // SR: the source reader
42 // SC: the SourceContext
43 // DCn: nth DestinationContext
44 // DRn: nth DestinationReader
45 // DHn: nth DestinationHandle
46 // ---------
47 // (normal)
48 // ---> DC1 <--- DR1 / DH1
49 // |
50 // |
51 // SR <--SC <-> R
52 // |
53 // |
54 // ---> DC2 <--- DR2 / DH2
55 //
56 // ---------
57 //
58 // The root object (R) refers to the SourceContext, and is referred by many
59 // objects including the SourceContext. As the root object is a
60 // ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
61 // The root object only has "stop" function that breaks the reference cycle.
62 // It will be called when:
63 // - The source context finishes reading,
64 // - The source context gets errored while reading,
65 // - The execution context associated with the source context is stopped or
66 // - All destination handles and readers are gone.
67 //
68 // ---------
69 // (stopped)
70 // ---> DC1 <--- DR1 / DH1
71 // |
72 // |
73 // SR <--SC --> R
74 // |
75 // |
76 // ---> DC2 <--- DR2 / DH2
77 //
78 // -------
79 // When |stop| is called, no one has a strong reference to the source context
80 // and it will be collected.
81 //
82
83 class SourceContext;
84
85 class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
86 public:
87 static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObjec t()); }
88
89 void initialize(SourceContext* sourceContext)
90 {
91 m_sourceContext = sourceContext;
92 }
93
94 // This function can be called from any thread.
95 void stop()
96 {
97 m_sourceContext = nullptr;
98 }
99
100 private:
101 TeeRootObject() = default;
102
103 CrossThreadPersistent<SourceContext> m_sourceContext;
104 };
105
106 class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
107 public:
108 static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(root)); }
109 ~DestinationTracker()
110 {
111 m_root->stop();
112 }
113
114 private:
115 explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) { }
116
117 RefPtr<TeeRootObject> m_root;
118 };
119
120 class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
121 public:
122 class Proxy : public ThreadSafeRefCounted<Proxy> {
123 public:
124 static PassRefPtr<Proxy> create(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker)
125 {
126 return adoptRef(new Proxy(context, tracker));
127 }
128 ~Proxy()
129 {
130 m_context->detach();
131 }
132
133 DestinationContext* context() { return m_context.get(); }
134
135 private:
136 Proxy(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTrac ker> tracker) : m_context(context), m_tracker(tracker) { }
137
138 RefPtr<DestinationContext> m_context;
139 RefPtr<DestinationTracker> m_tracker;
140 };
141
142 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina tionContext()); }
143
144 void enqueue(const char* buffer, size_t size)
145 {
146 bool needsNotification = false;
147 {
148 MutexLocker locker(m_mutex);
149 needsNotification = m_queue.isEmpty();
150 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>);
151 data->append(buffer, size);
152 m_queue.append(data.release());
153 }
154 if (needsNotification)
155 notify();
156 }
157
158 void setResult(Result r)
159 {
160 ASSERT(r != WebDataConsumerHandle::Ok);
161 ASSERT(r != WebDataConsumerHandle::ShouldWait);
162 {
163 MutexLocker locker(m_mutex);
164 if (m_result != WebDataConsumerHandle::ShouldWait) {
165 // The result was already set.
166 return;
167 }
168 m_result = r;
169 if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
170 m_queue.clear();
171 }
172 notify();
173 }
174
175 void notify()
176 {
177 {
178 MutexLocker locker(m_mutex);
179 if (!m_client) {
180 // No client is registered.
181 return;
182 }
183 ASSERT(m_readerThread);
184 if (!m_readerThread->isCurrentThread()) {
185 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Des tinationContext::notify, this)));
186 return;
187 }
188 }
189 // The reading thread is the current thread.
190 if (m_client)
191 m_client->didGetReadable();
192 }
193
194 Mutex& mutex() { return m_mutex; }
195
196 // The following functions don't use lock. They should be protected by the
197 // caller.
198 void attachReader(WebDataConsumerHandle::Client* client)
199 {
200 ASSERT(!m_readerThread);
201 ASSERT(!m_client);
202 m_readerThread = Platform::current()->currentThread();
203 m_client = client;
204 }
205 void detachReader()
206 {
207 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
208 m_readerThread = nullptr;
209 m_client = nullptr;
210 }
211 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); }
212 bool isEmpty() const { return m_queue.isEmpty(); }
213 size_t offset() const { return m_offset; }
214 void consume(size_t size)
215 {
216 const auto& top = m_queue.first();
217 ASSERT(m_offset <= m_offset + size);
218 ASSERT(m_offset + size <= top->size());
219 if (top->size() <= m_offset + size) {
220 m_offset = 0;
221 m_queue.removeFirst();
222 } else {
223 m_offset += size;
224 }
225 }
226 Result result() { return m_result; }
227
228 private:
229 DestinationContext()
230 : m_result(WebDataConsumerHandle::ShouldWait)
231 , m_readerThread(nullptr)
232 , m_client(nullptr)
233 , m_offset(0)
234 , m_isTwoPhaseReadInProgress(false)
235 {
236 }
237
238 void detach()
239 {
240 ASSERT(!m_client);
241 ASSERT(!m_readerThread);
242 m_queue.clear();
243 }
244
245 Result m_result;
246 Deque<OwnPtr<Vector<char>>> m_queue;
247 // Note: Holding a WebThread raw pointer is not generally safe, but we can
248 // do that in this case because:
249 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
250 // responsibility.
251 // 2. |m_readerThread| will never be used after the associated reader is
252 // detached.
253 WebThread* m_readerThread;
254 WebDataConsumerHandle::Client* m_client;
255 size_t m_offset;
256 bool m_isTwoPhaseReadInProgress;
257 Mutex m_mutex;
258 };
259
260 class DestinationReader final : public WebDataConsumerHandle::Reader {
261 public:
262 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client)
263 : m_contextProxy(contextProxy)
264 {
265 MutexLocker(context()->mutex());
jochen (gone - plz use gerrit) 2015/10/13 11:42:37 this actually doesn't do what you think it does (m
266 context()->attachReader(client);
267 if (client) {
268 // We need to use threadSafeBind here to retain the context. Note
269 // |context()| return value is of type DestinationContext*, not
270 // PassRefPtr<DestinationContext>.
271 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(t hreadSafeBind(&DestinationContext::notify, context())));
272 }
273 }
274 ~DestinationReader() override
275 {
276 MutexLocker(context()->mutex());
jochen (gone - plz use gerrit) 2015/10/13 11:42:37 same here
277 context()->detachReader();
278 }
279
280 Result read(void* buffer, size_t size, Flags, size_t* readSize) override
281 {
282 MutexLocker locker(context()->mutex());
283 *readSize = 0;
284 if (context()->isEmpty())
285 return context()->result();
286
287 const OwnPtr<Vector<char>>& chunk = context()->top();
288 size_t sizeToCopy = std::min(size, chunk->size() - context()->offset());
289 std::copy(chunk->data() + context()->offset(), chunk->data() + context() ->offset() + sizeToCopy, static_cast<char*>(buffer));
290 context()->consume(sizeToCopy);
291 *readSize = sizeToCopy;
292 return WebDataConsumerHandle::Ok;
293 }
294
295 Result beginRead(const void** buffer, Flags, size_t* available) override
296 {
297 MutexLocker locker(context()->mutex());
298 *available = 0;
299 *buffer = nullptr;
300 if (context()->isEmpty())
301 return context()->result();
302
303 const OwnPtr<Vector<char>>& chunk = context()->top();
304 *available = chunk->size() - context()->offset();
305 *buffer = chunk->data() + context()->offset();
306 return WebDataConsumerHandle::Ok;
307 }
308
309 Result endRead(size_t readSize) override
310 {
311 MutexLocker locker(context()->mutex());
312 if (context()->isEmpty())
313 return WebDataConsumerHandle::UnexpectedError;
314 context()->consume(readSize);
315 return WebDataConsumerHandle::Ok;
316 }
317
318 private:
319 DestinationContext* context() { return m_contextProxy->context(); }
320
321 RefPtr<DestinationContext::Proxy> m_contextProxy;
322 };
323
324 class DestinationHandle final : public WebDataConsumerHandle {
325 public:
326 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { }
327
328 private:
329 DestinationReader* obtainReaderInternal(Client* client) { return new Destina tionReader(m_contextProxy, client); }
330
331 RefPtr<DestinationContext::Proxy> m_contextProxy;
332 };
333
334 // Bound to the created thread.
335 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
336 WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
337 public:
338 SourceContext(
339 PassRefPtr<TeeRootObject> root,
340 PassOwnPtr<WebDataConsumerHandle> src,
341 PassRefPtr<DestinationContext> dest1,
342 PassRefPtr<DestinationContext> dest2,
343 ExecutionContext* executionContext)
344 : ActiveDOMObject(executionContext)
345 , m_root(root)
346 , m_reader(src->obtainReader(this))
347 , m_dest1(dest1)
348 , m_dest2(dest2)
349 {
350 suspendIfNeeded();
351 }
352 ~SourceContext() override
353 {
354 stopInternal();
355 }
356
357 void didGetReadable() override
358 {
359 ASSERT(m_reader);
360 Result r = WebDataConsumerHandle::Ok;
361 while (true) {
362 const void* buffer = nullptr;
363 size_t available = 0;
364 r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &a vailable);
365 if (r == WebDataConsumerHandle::ShouldWait)
366 return;
367 if (r != WebDataConsumerHandle::Ok)
368 break;
369 m_dest1->enqueue(static_cast<const char*>(buffer), available);
370 m_dest2->enqueue(static_cast<const char*>(buffer), available);
371 m_reader->endRead(available);
372 }
373 m_dest1->setResult(r);
374 m_dest2->setResult(r);
375 stopInternal();
376 }
377
378 void stop() override
379 {
380 stopInternal();
381 ActiveDOMObject::stop();
382 }
383
384 DEFINE_INLINE_VIRTUAL_TRACE()
385 {
386 ActiveDOMObject::trace(visitor);
387 }
388
389 private:
390 void stopInternal()
391 {
392 if (!m_root)
393 return;
394 // When we already set a result, this result setting will be ignored.
395 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
396 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
397 m_root->stop();
398 m_root = nullptr;
399 m_reader = nullptr;
400 m_dest1 = nullptr;
401 m_dest2 = nullptr;
402 }
403
404 RefPtr<TeeRootObject> m_root;
405 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
406 RefPtr<DestinationContext> m_dest1;
407 RefPtr<DestinationContext> m_dest2;
408 };
409
410 } // namespace
411
412 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons umerHandle>* dest2)
413 {
414 RefPtr<TeeRootObject> root = TeeRootObject::create();
415 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
416 RefPtr<DestinationContext> context1 = DestinationContext::create();
417 RefPtr<DestinationContext> context2 = DestinationContext::create();
418
419 root->initialize(new SourceContext(root, src, context1, context2, executionC ontext));
420
421 *dest1 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext1, tracker)));
422 *dest2 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext2, tracker)));
423 }
424
425 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698