Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(489)

Side by Side Diff: Source/modules/fetch/DataConsumerTee.cpp

Issue 1195563002: Introduce DataConsumerHandleTee (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/dom/ActiveDOMObject.h"
9 #include "core/dom/ExecutionContext.h"
10 #include "platform/Task.h"
11 #include "platform/ThreadSafeFunctional.h"
12 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebThread.h"
15 #include "public/platform/WebTraceLocation.h"
16 #include "wtf/Deque.h"
17 #include "wtf/Functional.h"
18 #include "wtf/ThreadSafeRefCounted.h"
19 #include "wtf/ThreadingPrimitives.h"
20 #include "wtf/Vector.h"
21
22 namespace blink {
23
24 using Result = WebDataConsumerHandle::Result;
25 using Flags = WebDataConsumerHandle::Flags;
26
27 namespace {
28
29 // This file contains the "tee" implementation. There are several classes and
30 // their relationship is complicated, so let me describe here.
31 //
32 // Tee::create function creates two DestinationHandles (destinations) from one
33 // WebDataConsumerHandle (source). In fact, it uses a reader of the source
34 // handle.
35 //
36 // SourceContext reads data from the source reader and enques it to two
37 // destination contexts. Destination readers read data from its associated
38 // contexts. Here is an object graph.
39 //
40 // R: the root object
41 // SR: the source reader
42 // SC: the SourceContext
43 // DCn: nth DestinationContext
44 // DRn: nth DestinationReader
45 // DHn: nth DestinationHandle
46 // ---------
47 // (normal)
48 // ---> DC1 <--- DR1 / DH1
49 // |
50 // |
51 // SR <--SC <-> R
52 // |
53 // |
54 // ---> DC2 <--- DR2 / DH2
55 //
56 // ---------
57 //
58 // The root object (R) refers to the SourceContext, and is referred by many
59 // objects including the SourceContext. As the root object is a
60 // ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
61 // The root object only has "stop" function that breaks the reference cycle.
62 // It will be called when:
63 // - The source context finishes reading,
64 // - The source context gets errored while reading,
65 // - The execution context associated with the source context is stopped or
66 // - All destination handles and readers are gone.
67 //
68 // ---------
69 // (stopped)
70 // ---> DC1 <--- DR1 / DH1
71 // |
72 // |
73 // SR <--SC --> R
74 // |
75 // |
76 // ---> DC2 <--- DR2 / DH2
77 //
78 // -------
79 // When |stop| is called, no one has a strong reference to the source context
80 // and it will be collected.
81 //
82
83 class SourceContext;
84
85 class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
86 public:
87 static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObjec t()); }
88
89 void initialize(SourceContext* sourceContext)
90 {
91 m_sourceContext = sourceContext;
92 }
93
94 // This function can be called from any thread.
95 void stop()
96 {
97 m_sourceContext = nullptr;
98 }
99
100 private:
101 TeeRootObject() = default;
102
103 CrossThreadPersistent<SourceContext> m_sourceContext;
104 };
105
106 class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
107 public:
108 static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(root)); }
109 ~DestinationTracker()
110 {
111 m_root->stop();
112 }
113
114 private:
115 explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) { }
116
117 RefPtr<TeeRootObject> m_root;
118 };
119
120 class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
121 public:
122 class Proxy : public ThreadSafeRefCounted<Proxy> {
123 public:
124 static PassRefPtr<Proxy> create(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker)
125 {
126 return adoptRef(new Proxy(context, tracker));
127 }
128 ~Proxy()
129 {
130 m_context->detach();
131 }
132
133 DestinationContext* context() { return m_context.get(); }
134
135 private:
136 Proxy(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTrac ker> tracker) : m_context(context), m_tracker(tracker) { }
137
138 RefPtr<DestinationContext> m_context;
139 RefPtr<DestinationTracker> m_tracker;
140 };
141
142 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina tionContext()); }
143
144 void enqueue(const char* buffer, size_t size)
145 {
146 bool needsNotification = false;
147 {
148 MutexLocker locker(m_mutex);
149 needsNotification = m_queue.isEmpty();
150 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>);
151 data->append(buffer, size);
152 m_queue.append(data.release());
153 }
154 if (needsNotification)
155 notify();
156 }
157
158 void setResult(Result r)
159 {
160 ASSERT(r != WebDataConsumerHandle::Ok);
161 ASSERT(r != WebDataConsumerHandle::ShouldWait);
162 {
163 MutexLocker locker(m_mutex);
164 if (m_result != WebDataConsumerHandle::ShouldWait) {
165 // The result was already set.
166 return;
167 }
168 m_result = r;
169 if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
170 m_queue.clear();
171 }
172 notify();
173 }
174
175 void notify()
176 {
177 {
178 MutexLocker locker(m_mutex);
hiroshige 2015/06/19 07:55:40 Crashing here... #0 0x4ca2855 in WTF::MutexBase::
yhirano 2015/06/19 08:14:21 Fixed.
179 if (!m_client) {
180 // No client is registered.
181 return;
182 }
183 ASSERT(m_readerThread);
184 if (!m_readerThread->isCurrentThread()) {
185 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Des tinationContext::notify, this)));
186 return;
187 }
188 }
189 // The reading thread is the current thread.
190 if (m_client)
191 m_client->didGetReadable();
192 }
193
194 // The following functions don't use lock.
195 void attachReader(WebDataConsumerHandle::Client* client)
196 {
197 ASSERT(!m_readerThread);
198 ASSERT(!m_client);
199 m_readerThread = Platform::current()->currentThread();
200 m_client = client;
201 }
202 void detachReader()
203 {
204 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
205 m_readerThread = nullptr;
206 m_client = nullptr;
207 }
208 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); }
209 bool isEmpty() const { return m_queue.isEmpty(); }
210 size_t offset() const { return m_offset; }
211 void consume(size_t size)
212 {
213 const auto& top = m_queue.first();
214 ASSERT(m_offset + size <= top->size());
215 if (top->size() <= m_offset + size) {
216 m_offset = 0;
217 m_queue.removeFirst();
218 } else {
219 m_offset += size;
220 }
221 }
222 Result result() { return m_result; }
223 Mutex& mutex() { return m_mutex; }
224
225 private:
226 DestinationContext()
227 : m_result(WebDataConsumerHandle::ShouldWait)
228 , m_readerThread(nullptr)
229 , m_client(nullptr)
230 , m_offset(0)
231 , m_isTwoPhaseReadInProgress(false)
232 {
233 }
234
235 void detach()
236 {
237 ASSERT(!m_client);
238 ASSERT(!m_readerThread);
239 m_queue.clear();
240 }
241
242 Result m_result;
243 Deque<OwnPtr<Vector<char>>> m_queue;
244 // Note: Holding a WebThread raw pointer is not generally safe, but we can
245 // do that in this case because:
246 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
247 // responsibility.
248 // 2. |m_readerThread| will never be used after the associated reader is
249 // detached.
250 WebThread* m_readerThread;
251 WebDataConsumerHandle::Client* m_client;
252 size_t m_offset;
253 bool m_isTwoPhaseReadInProgress;
254 Mutex m_mutex;
255 };
256
257 class DestinationReader final : public WebDataConsumerHandle::Reader {
258 public:
259 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client)
260 : m_contextProxy(contextProxy)
261 {
262 MutexLocker(context()->mutex());
263 context()->attachReader(client);
264 if (client)
265 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(b ind(&DestinationContext::notify, context())));
266 }
267 ~DestinationReader() override
268 {
269 MutexLocker(context()->mutex());
270 context()->detachReader();
271 }
272
273 Result read(void* buffer, size_t size, Flags, size_t* readSize) override
274 {
275 MutexLocker locker(context()->mutex());
276 *readSize = 0;
277 if (context()->isEmpty())
278 return context()->result();
279
280 const OwnPtr<Vector<char>>& chunk = context()->top();
281 size_t sizeToCopy = std::min(size, chunk->size() - context()->offset());
282 std::copy(chunk->data() + context()->offset(), chunk->data() + context() ->offset() + sizeToCopy, static_cast<char*>(buffer));
283 context()->consume(sizeToCopy);
284 *readSize = sizeToCopy;
285 return WebDataConsumerHandle::Ok;
286 }
287
288 Result beginRead(const void** buffer, Flags, size_t* available) override
289 {
290 MutexLocker locker(context()->mutex());
291 *available = 0;
292 *buffer = nullptr;
293 if (context()->isEmpty())
294 return context()->result();
295
296 const OwnPtr<Vector<char>>& chunk = context()->top();
297 *available = chunk->size() - context()->offset();
298 *buffer = chunk->data() + context()->offset();
299 return WebDataConsumerHandle::Ok;
300 }
301
302 Result endRead(size_t readSize) override
303 {
304 MutexLocker locker(context()->mutex());
305 if (context()->isEmpty())
306 return WebDataConsumerHandle::UnexpectedError;
307 context()->consume(readSize);
308 return WebDataConsumerHandle::Ok;
309 }
310
311 private:
312 DestinationContext* context() { return m_contextProxy->context(); }
313
314 RefPtr<DestinationContext::Proxy> m_contextProxy;
315 };
316
317 class DestinationHandle final : public WebDataConsumerHandle {
318 public:
319 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { }
320
321 private:
322 DestinationReader* obtainReaderInternal(Client* client) { return new Destina tionReader(m_contextProxy, client); }
323
324 RefPtr<DestinationContext::Proxy> m_contextProxy;
325 };
326
327 // Bound to the created thread.
328 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
329 WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
330 public:
331 SourceContext(
332 PassRefPtr<TeeRootObject> root,
333 PassOwnPtr<WebDataConsumerHandle> src,
334 PassRefPtr<DestinationContext> dest1,
335 PassRefPtr<DestinationContext> dest2,
336 ExecutionContext* executionContext)
337 : ActiveDOMObject(executionContext)
338 , m_root(root)
339 , m_reader(src->obtainReader(this))
340 , m_dest1(dest1)
341 , m_dest2(dest2)
342 {
343 suspendIfNeeded();
344 }
345 ~SourceContext() override
346 {
347 stopInternal();
348 }
349
350 void didGetReadable() override
351 {
352 ASSERT(m_reader);
353 Result r = WebDataConsumerHandle::Ok;
354 while (true) {
355 const void* buffer = nullptr;
356 size_t available = 0;
357 r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &a vailable);
358 if (r == WebDataConsumerHandle::ShouldWait)
359 return;
360 if (r != WebDataConsumerHandle::Ok)
361 break;
362 m_dest1->enqueue(static_cast<const char*>(buffer), available);
363 m_dest2->enqueue(static_cast<const char*>(buffer), available);
364 m_reader->endRead(available);
365 }
366 m_dest1->setResult(r);
367 m_dest2->setResult(r);
368 stopInternal();
369 }
370
371 void stop() override
372 {
373 stopInternal();
374 ActiveDOMObject::stop();
375 }
376
377 DEFINE_INLINE_VIRTUAL_TRACE()
378 {
379 ActiveDOMObject::trace(visitor);
380 }
381
382 private:
383 void stopInternal()
384 {
385 if (!m_root)
386 return;
387 // When we already set a result, this result setting will be ignored.
388 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
389 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
390 m_root->stop();
391 m_root = nullptr;
392 m_reader = nullptr;
393 m_dest1 = nullptr;
394 m_dest2 = nullptr;
395 }
396
397 RefPtr<TeeRootObject> m_root;
398 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
399 RefPtr<DestinationContext> m_dest1;
400 RefPtr<DestinationContext> m_dest2;
401 };
402
403 } // namespace
404
405 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons umerHandle>* dest2)
406 {
407 RefPtr<TeeRootObject> root = TeeRootObject::create();
408 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
409 RefPtr<DestinationContext> context1 = DestinationContext::create();
410 RefPtr<DestinationContext> context2 = DestinationContext::create();
411
412 root->initialize(new SourceContext(root, src, context1, context2, executionC ontext));
413
414 *dest1 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext1, tracker)));
415 *dest2 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext2, tracker)));
416 }
417
418 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698