Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(758)

Side by Side Diff: Source/modules/fetch/DataConsumerTee.cpp

Issue 1195563002: Introduce DataConsumerHandleTee (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/dom/ActiveDOMObject.h"
9 #include "core/dom/ExecutionContext.h"
10 #include "platform/Task.h"
11 #include "platform/ThreadSafeFunctional.h"
12 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebThread.h"
15 #include "public/platform/WebTraceLocation.h"
16 #include "wtf/Deque.h"
17 #include "wtf/Functional.h"
18 #include "wtf/ThreadSafeRefCounted.h"
19 #include "wtf/ThreadingPrimitives.h"
20 #include "wtf/Vector.h"
21
22 namespace blink {
23
24 using Result = WebDataConsumerHandle::Result;
25 using Flags = WebDataConsumerHandle::Flags;
26
27 namespace {
28
29 // This file contains the "tee" implementation. There are several classes and
30 // their relationship is complicated, so let me describe here.
31 //
32 // Tee::create function creates two DestinationHandles (destinations) from one
33 // WebDataConsumerHandle (source). In fact, it uses a reader of the source
34 // handle.
35 //
36 // SourceContext reads data from the source reader and enques it to two
37 // destination contexts. Destination readers read data from its associated
38 // contexts. Here is an object graph.
39 //
40 // R: the root object
41 // SR: the source reader
42 // SC: the SourceContext
43 // DCn: nth DestinationContext
44 // DRn: nth DestinationReader
45 // DHn: nth DestinationHandle
46 // ---------
47 // (normal)
48 // ---> DC1 <--- DR1 / DH1
49 // |
50 // |
51 // SR <--SC <-> R
52 // |
53 // |
54 // ---> DC2 <--- DR2 / DH2
55 //
56 // ---------
57 //
58 // The root object (R) refers to the SourceContext, and is referred by many
59 // objects including the SourceContext. As the root object is a
60 // ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
61 // The root object only has "stop" function that breaks the reference cycle.
62 // It will be called when:
63 // - The source context finishes reading,
64 // - The source context gets errored while reading,
65 // - The execution context associated with the source context is stopped or
66 // - All destination handles and readers are gone.
67 //
68 // ---------
69 // (stopped)
70 // ---> DC1 <--- DR1 / DH1
71 // |
72 // |
73 // SR <--SC --> R
74 // |
75 // |
76 // ---> DC2 <--- DR2 / DH2
77 //
78 // -------
79 // When |stop| is called, no one has a strong reference to the source context
80 // and it will be collected.
81 //
82
83 class SourceContext;
84
85 class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
86 public:
87 static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObjec t()); }
88
89 void initialize(SourceContext* sourceContext)
90 {
91 m_sourceContext = sourceContext;
92 }
93
94 // This function can be called from any thread.
95 void stop()
96 {
97 m_sourceContext = nullptr;
98 }
99
100 private:
101 TeeRootObject() = default;
102
103 CrossThreadPersistent<SourceContext> m_sourceContext;
104 };
105
106 class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
107 public:
108 static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(root)); }
109 ~DestinationTracker()
110 {
111 m_root->stop();
112 }
113
114 private:
115 explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) { }
116
117 RefPtr<TeeRootObject> m_root;
118 };
119
120 class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
121 public:
122 class Proxy : public ThreadSafeRefCounted<Proxy> {
123 public:
124 static PassRefPtr<Proxy> create(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTracker> tracker)
125 {
126 return adoptRef(new Proxy(context, tracker));
127 }
128 ~Proxy()
129 {
130 m_context->detach();
131 }
132
133 DestinationContext* context() { return m_context.get(); }
134
135 private:
136 Proxy(PassRefPtr<DestinationContext> context, PassRefPtr<DestinationTrac ker> tracker) : m_context(context), m_tracker(tracker) { }
137
138 RefPtr<DestinationContext> m_context;
139 RefPtr<DestinationTracker> m_tracker;
140 };
141
142 static PassRefPtr<DestinationContext> create() { return adoptRef(new Destina tionContext()); }
143
144 void enqueue(const char* buffer, size_t size)
145 {
146 bool needsNotification = false;
147 {
148 MutexLocker locker(m_mutex);
149 needsNotification = m_queue.isEmpty();
150 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>);
151 data->append(buffer, size);
152 m_queue.append(data.release());
153 }
154 if (needsNotification)
155 notify();
156 }
157
158 void setResult(Result r)
159 {
160 ASSERT(r != WebDataConsumerHandle::Ok);
161 ASSERT(r != WebDataConsumerHandle::ShouldWait);
162 {
163 MutexLocker locker(m_mutex);
164 if (m_result != WebDataConsumerHandle::ShouldWait) {
165 // The result was already set.
166 return;
167 }
168 m_result = r;
169 if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
170 m_queue.clear();
171 }
172 notify();
173 }
174
175 void notify()
176 {
177 {
178 MutexLocker locker(m_mutex);
179 if (!m_client) {
180 // No client is registered.
181 return;
182 }
183 ASSERT(m_readerThread);
184 if (!m_readerThread->isCurrentThread()) {
185 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Des tinationContext::notify, this)));
186 return;
187 }
188 }
189 // The reading thread is the current thread.
190 if (m_client)
191 m_client->didGetReadable();
192 }
193
194 // The following functions don't use lock.
hiroshige 2015/06/23 06:19:49 These functions are protected by |m_mutex| on the
yhirano 2015/06/23 07:25:53 Done.
195 void attachReader(WebDataConsumerHandle::Client* client)
196 {
197 ASSERT(!m_readerThread);
198 ASSERT(!m_client);
199 m_readerThread = Platform::current()->currentThread();
200 m_client = client;
201 }
202 void detachReader()
203 {
204 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
205 m_readerThread = nullptr;
206 m_client = nullptr;
207 }
208 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); }
209 bool isEmpty() const { return m_queue.isEmpty(); }
210 size_t offset() const { return m_offset; }
211 void consume(size_t size)
212 {
213 const auto& top = m_queue.first();
214 ASSERT(m_offset + size <= top->size());
hiroshige 2015/06/23 06:19:49 How about adding ASSERT(m_offset <= m_offset + siz
yhirano 2015/06/23 07:25:53 Done.
215 if (top->size() <= m_offset + size) {
216 m_offset = 0;
217 m_queue.removeFirst();
218 } else {
219 m_offset += size;
220 }
221 }
222 Result result() { return m_result; }
223 Mutex& mutex() { return m_mutex; }
224
225 private:
226 DestinationContext()
227 : m_result(WebDataConsumerHandle::ShouldWait)
228 , m_readerThread(nullptr)
229 , m_client(nullptr)
230 , m_offset(0)
231 , m_isTwoPhaseReadInProgress(false)
232 {
233 }
234
235 void detach()
236 {
237 ASSERT(!m_client);
238 ASSERT(!m_readerThread);
239 m_queue.clear();
240 }
241
242 Result m_result;
243 Deque<OwnPtr<Vector<char>>> m_queue;
244 // Note: Holding a WebThread raw pointer is not generally safe, but we can
245 // do that in this case because:
246 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
247 // responsibility.
248 // 2. |m_readerThread| will never be used after the associated reader is
249 // detached.
250 WebThread* m_readerThread;
251 WebDataConsumerHandle::Client* m_client;
252 size_t m_offset;
253 bool m_isTwoPhaseReadInProgress;
254 Mutex m_mutex;
255 };
256
257 class DestinationReader final : public WebDataConsumerHandle::Reader {
258 public:
259 DestinationReader(PassRefPtr<DestinationContext::Proxy> contextProxy, WebDat aConsumerHandle::Client* client)
260 : m_contextProxy(contextProxy)
261 {
262 MutexLocker(context()->mutex());
263 context()->attachReader(client);
264 if (client) {
265 // We need to use threadSafeBind here to retain the context. Note
266 // |context()| return value is of type DestinationContext*, not
267 // PassRefPtr<DestinationContext>.
268 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(t hreadSafeBind(&DestinationContext::notify, context())));
269 }
270 }
271 ~DestinationReader() override
272 {
273 MutexLocker(context()->mutex());
274 context()->detachReader();
275 }
276
277 Result read(void* buffer, size_t size, Flags, size_t* readSize) override
278 {
279 MutexLocker locker(context()->mutex());
280 *readSize = 0;
281 if (context()->isEmpty())
282 return context()->result();
283
284 const OwnPtr<Vector<char>>& chunk = context()->top();
285 size_t sizeToCopy = std::min(size, chunk->size() - context()->offset());
286 std::copy(chunk->data() + context()->offset(), chunk->data() + context() ->offset() + sizeToCopy, static_cast<char*>(buffer));
287 context()->consume(sizeToCopy);
288 *readSize = sizeToCopy;
289 return WebDataConsumerHandle::Ok;
290 }
291
292 Result beginRead(const void** buffer, Flags, size_t* available) override
293 {
294 MutexLocker locker(context()->mutex());
295 *available = 0;
296 *buffer = nullptr;
297 if (context()->isEmpty())
298 return context()->result();
299
300 const OwnPtr<Vector<char>>& chunk = context()->top();
301 *available = chunk->size() - context()->offset();
302 *buffer = chunk->data() + context()->offset();
303 return WebDataConsumerHandle::Ok;
304 }
305
306 Result endRead(size_t readSize) override
307 {
308 MutexLocker locker(context()->mutex());
309 if (context()->isEmpty())
310 return WebDataConsumerHandle::UnexpectedError;
311 context()->consume(readSize);
312 return WebDataConsumerHandle::Ok;
313 }
314
315 private:
316 DestinationContext* context() { return m_contextProxy->context(); }
317
318 RefPtr<DestinationContext::Proxy> m_contextProxy;
319 };
320
321 class DestinationHandle final : public WebDataConsumerHandle {
322 public:
323 DestinationHandle(PassRefPtr<DestinationContext::Proxy> contextProxy) : m_co ntextProxy(contextProxy) { }
324
325 private:
326 DestinationReader* obtainReaderInternal(Client* client) { return new Destina tionReader(m_contextProxy, client); }
327
328 RefPtr<DestinationContext::Proxy> m_contextProxy;
329 };
330
331 // Bound to the created thread.
332 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
333 WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
334 public:
335 SourceContext(
336 PassRefPtr<TeeRootObject> root,
337 PassOwnPtr<WebDataConsumerHandle> src,
338 PassRefPtr<DestinationContext> dest1,
339 PassRefPtr<DestinationContext> dest2,
340 ExecutionContext* executionContext)
341 : ActiveDOMObject(executionContext)
342 , m_root(root)
343 , m_reader(src->obtainReader(this))
344 , m_dest1(dest1)
345 , m_dest2(dest2)
346 {
347 suspendIfNeeded();
348 }
349 ~SourceContext() override
350 {
351 stopInternal();
352 }
353
354 void didGetReadable() override
355 {
356 ASSERT(m_reader);
357 Result r = WebDataConsumerHandle::Ok;
358 while (true) {
359 const void* buffer = nullptr;
360 size_t available = 0;
361 r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &a vailable);
362 if (r == WebDataConsumerHandle::ShouldWait)
363 return;
364 if (r != WebDataConsumerHandle::Ok)
365 break;
366 m_dest1->enqueue(static_cast<const char*>(buffer), available);
367 m_dest2->enqueue(static_cast<const char*>(buffer), available);
368 m_reader->endRead(available);
369 }
370 m_dest1->setResult(r);
371 m_dest2->setResult(r);
372 stopInternal();
373 }
374
375 void stop() override
376 {
377 stopInternal();
378 ActiveDOMObject::stop();
379 }
380
381 DEFINE_INLINE_VIRTUAL_TRACE()
382 {
383 ActiveDOMObject::trace(visitor);
384 }
385
386 private:
387 void stopInternal()
388 {
389 if (!m_root)
390 return;
391 // When we already set a result, this result setting will be ignored.
392 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
393 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
394 m_root->stop();
395 m_root = nullptr;
396 m_reader = nullptr;
397 m_dest1 = nullptr;
398 m_dest2 = nullptr;
399 }
400
401 RefPtr<TeeRootObject> m_root;
402 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
403 RefPtr<DestinationContext> m_dest1;
404 RefPtr<DestinationContext> m_dest2;
405 };
406
407 } // namespace
408
409 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons umerHandle>* dest2)
410 {
411 RefPtr<TeeRootObject> root = TeeRootObject::create();
412 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
413 RefPtr<DestinationContext> context1 = DestinationContext::create();
414 RefPtr<DestinationContext> context2 = DestinationContext::create();
415
416 root->initialize(new SourceContext(root, src, context1, context2, executionC ontext));
417
418 *dest1 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext1, tracker)));
419 *dest2 = adoptPtr(new DestinationHandle(DestinationContext::Proxy::create(co ntext2, tracker)));
420 }
421
422 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.h ('k') | Source/modules/fetch/DataConsumerTeeTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698