Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: Source/modules/fetch/DataConsumerTee.cpp

Issue 1195563002: Introduce DataConsumerHandleTee (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/dom/ActiveDOMObject.h"
9 #include "core/dom/ExecutionContext.h"
10 #include "platform/Task.h"
11 #include "platform/ThreadSafeFunctional.h"
12 #include "platform/heap/Handle.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebThread.h"
15 #include "public/platform/WebTraceLocation.h"
16 #include "wtf/Deque.h"
17 #include "wtf/Functional.h"
18 #include "wtf/ThreadSafeRefCounted.h"
19 #include "wtf/ThreadingPrimitives.h"
20 #include "wtf/Vector.h"
21
22 namespace blink {
23
24 using Result = WebDataConsumerHandle::Result;
25 using Flags = WebDataConsumerHandle::Flags;
26
27 namespace {
28
29 // This file contains the "tee" implementation. There are several classes and
30 // their relationship is complicated, so let me describe here.
31 //
32 // Tee::create function creates two DestinationHandles (destinations) from one
33 // WebDataConsumerHandle (source). In fact, it uses a reader of the source
34 // handle.
35 //
36 // SourceContext reads data from the source reader and enques it to two
37 // destination contexts. Destination readers read data from its assocaited
hiroshige 2015/06/19 06:40:35 nit: associated
yhirano 2015/06/19 07:27:52 Done.
38 // contexts. Here is an object graph.
39 //
40 // R: the root object
41 // SR: the source reader
42 // SC: the SourceContext
43 // DCn: nth DestinationContext
44 // DRn: nth DestinationReader
45 // DHn: nth DestinationHandle
46 // ---------
47 // (normal)
48 // ---> DC1 <--- DR1 / DH1
49 // |
50 // |
51 // SR <--SC <-> R
52 // |
53 // |
54 // ---> DC2 <--- DR2 / DH2
55 //
56 // ---------
57 //
58 // The root object (R) refers to the SourceContext, and is referred by many
59 // objects including the SourceContext. As the root object is a
60 // ThreadSafeRefCounted that reference cycle keeps the entire pipe alive.
61 // The root object only has "stop" function that breaks the reference cycle.
62 // It will be called when:
63 // - The source context finishes reading,
64 // - The source context gets errored while reading,
65 // - The execution context associated with the source context is stopped or
66 // - All destination handles and readers are gone.
67 //
68 // ---------
69 // (stopped)
70 // ---> DC1 <--- DR1 / DH1
71 // |
72 // |
73 // SR <--SC --> R
74 // |
75 // |
76 // ---> DC2 <--- DR2 / DH2
77 //
78 // -------
79 // When |stop| is called, no one has a strong reference to the source context
80 // and it will be collected.
81 //
82
83 class SourceContext;
84
85 class TeeRootObject final : public ThreadSafeRefCounted<TeeRootObject> {
86 public:
87 static PassRefPtr<TeeRootObject> create() { return adoptRef(new TeeRootObjec t()); }
88
89 void initialize(SourceContext* context)
hiroshige 2015/06/19 06:40:35 optional: I prefer names like |sourceContext|, |de
yhirano 2015/06/19 07:27:52 Done in this class. I think calling a DestinationC
90 {
91 m_context = context;
92 }
93
94 // This function can be called from any thread.
95 void stop()
96 {
97 m_context = nullptr;
98 }
99
100 private:
101 TeeRootObject() = default;
102
103 CrossThreadPersistent<SourceContext> m_context;
104 };
105
106 class DestinationTracker final : public ThreadSafeRefCounted<DestinationTracker> {
107 public:
108 enum Identifier {
109 First,
110 Second,
111 };
112 static PassRefPtr<DestinationTracker> create(PassRefPtr<TeeRootObject> root) { return adoptRef(new DestinationTracker(root)); }
113 void didDetach(Identifier identifier)
hiroshige 2015/06/19 06:40:35 How about calling m_root->stop() in the dtor? Then
yhirano 2015/06/19 07:27:52 You're right, thanks! Done.
114 {
115 ASSERT(m_root);
116 MutexLocker locker(m_mutex);
117 if (identifier == First) {
118 m_isFirstActive = false;
119 } else {
120 ASSERT(identifier == Second);
121 m_isSecondActive = false;
122 }
123 if (!m_isFirstActive && !m_isSecondActive) {
124 m_root->stop();
125 m_root = nullptr;
126 }
127 }
128
129 private:
130 explicit DestinationTracker(PassRefPtr<TeeRootObject> root) : m_root(root) , m_isFirstActive(true) , m_isSecondActive(true) { }
131
132 RefPtr<TeeRootObject> m_root;
133 Mutex m_mutex;
134 bool m_isFirstActive;
135 bool m_isSecondActive;
136 };
137
138 class DestinationContext final : public ThreadSafeRefCounted<DestinationContext> {
139 public:
140 using Identifier = DestinationTracker::Identifier;
141 static PassRefPtr<DestinationContext> create(PassRefPtr<DestinationTracker> tracker, Identifier identifier) { return adoptRef(new DestinationContext(tracker , identifier)); }
142
143 void enqueue(const char* buffer, size_t size)
144 {
145 bool needsNotification = false;
146 {
147 MutexLocker locker(m_mutex);
148 needsNotification = m_queue.isEmpty();
149 OwnPtr<Vector<char>> data = adoptPtr(new Vector<char>);
150 data->append(buffer, size);
151 m_queue.append(data.release());
152 }
153 if (needsNotification)
154 notify();
155 }
156
157 void setResult(Result r)
158 {
159 ASSERT(r != WebDataConsumerHandle::Ok);
160 ASSERT(r != WebDataConsumerHandle::ShouldWait);
161 {
162 MutexLocker locker(m_mutex);
163 if (m_result != WebDataConsumerHandle::ShouldWait) {
164 // The result was already set.
165 return;
166 }
167 m_result = r;
168 if (r != WebDataConsumerHandle::Done && !m_isTwoPhaseReadInProgress)
169 m_queue.clear();
170 }
171 notify();
172 }
173
174 void notify()
175 {
176 {
177 MutexLocker locker(m_mutex);
178 if (!m_client) {
179 // No client is registered.
180 return;
181 }
182 ASSERT(m_readerThread);
183 if (!m_readerThread->isCurrentThread()) {
184 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Des tinationContext::notify, this)));
185 return;
186 }
187 }
188 // The reading thread is the current thread.
189 if (m_client)
190 m_client->didGetReadable();
191 }
192
193 // The following functions don't use lock.
194 void attachReader(WebDataConsumerHandle::Client* client)
195 {
196 ASSERT(!m_readerThread);
197 ASSERT(!m_client);
198 m_readerThread = Platform::current()->currentThread();
199 m_client = client;
200 }
201 void detachReader()
202 {
203 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
204 m_readerThread = nullptr;
205 m_client = nullptr;
206 if (!m_isHandleActive)
207 detach();
hiroshige 2015/06/19 06:40:35 optional: I prefer to create a tracker object that
yhirano 2015/06/19 07:27:52 Done.
208 }
209 void detachHandle()
210 {
211 m_isHandleActive = false;
212 if (!m_readerThread)
213 detach();
214 }
215 const OwnPtr<Vector<char>>& top() const { return m_queue.first(); }
216 bool isEmpty() const { return m_queue.isEmpty(); }
217 size_t offset() const { return m_offset; }
218 void consume(size_t size)
219 {
220 const auto& top = m_queue.first();
221 ASSERT(m_offset + size <= top->size());
222 if (top->size() <= m_offset + size) {
223 m_offset = 0;
224 m_queue.removeFirst();
225 } else {
226 m_offset += size;
227 }
228 }
229 Result result() { return m_result; }
230 Mutex& mutex() { return m_mutex; }
231
232 private:
233 DestinationContext(PassRefPtr<DestinationTracker> tracker, Identifier identi fier)
234 : m_tracker(tracker)
235 , m_identifier(identifier)
236 , m_result(WebDataConsumerHandle::ShouldWait)
237 , m_readerThread(nullptr)
238 , m_client(nullptr)
239 , m_offset(0)
240 , m_isTwoPhaseReadInProgress(false)
241 , m_isHandleActive(true)
242 {
243 }
244
245 void detach()
246 {
247 ASSERT(!m_client);
248 ASSERT(!m_readerThread);
249 ASSERT(!m_isHandleActive);
250 m_tracker->didDetach(m_identifier);
251 m_queue.clear();
252 }
253
254 RefPtr<DestinationTracker> m_tracker;
255 Identifier m_identifier;
256 Result m_result;
257 Deque<OwnPtr<Vector<char>>> m_queue;
258 // Note: Holding a WebThread raw pointer is not generally safe, but we can
259 // do that in this case because:
260 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
261 // responsibility.
262 // 2. |m_readerThread| will never be used after the associated reader is
263 // detached.
264 WebThread* m_readerThread;
265 WebDataConsumerHandle::Client* m_client;
266 size_t m_offset;
267 bool m_isTwoPhaseReadInProgress;
268 bool m_isHandleActive;
269 Mutex m_mutex;
270 };
271
272 class DestinationReader final : public WebDataConsumerHandle::Reader {
273 public:
274 DestinationReader(PassRefPtr<DestinationContext> context, WebDataConsumerHan dle::Client* client) : m_context(context)
275 {
276 MutexLocker(m_context->mutex());
277 m_context->attachReader(client);
278 if (client)
279 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(b ind(&DestinationContext::notify, m_context)));
280 }
281 ~DestinationReader() override
282 {
283 MutexLocker(m_context->mutex());
284 m_context->detachReader();
285 }
286
287 Result read(void* buffer, size_t size, Flags, size_t* readSize) override
288 {
289 MutexLocker locker(m_context->mutex());
290 *readSize = 0;
291 if (m_context->isEmpty())
292 return m_context->result();
293
294 const OwnPtr<Vector<char>>& chunk = m_context->top();
295 size_t sizeToCopy = std::min(size, chunk->size() - m_context->offset());
296 std::copy(chunk->data() + m_context->offset(), chunk->data() + m_context ->offset() + sizeToCopy, static_cast<char*>(buffer));
297 m_context->consume(sizeToCopy);
298 *readSize = sizeToCopy;
299 return WebDataConsumerHandle::Ok;
300 }
301
302 Result beginRead(const void** buffer, Flags, size_t* available) override
303 {
304 MutexLocker locker(m_context->mutex());
305 *available = 0;
306 *buffer = nullptr;
307 if (m_context->isEmpty())
308 return m_context->result();
309
310 const OwnPtr<Vector<char>>& chunk = m_context->top();
311 *available = chunk->size() - m_context->offset();
312 *buffer = chunk->data() + m_context->offset();
313 return WebDataConsumerHandle::Ok;
314 }
315
316 Result endRead(size_t readSize) override
317 {
318 MutexLocker locker(m_context->mutex());
319 if (m_context->isEmpty())
320 return WebDataConsumerHandle::UnexpectedError;
321 m_context->consume(readSize);
322 return WebDataConsumerHandle::Ok;
323 }
324
325 private:
326 RefPtr<DestinationContext> m_context;
327 };
328
329 class DestinationHandle final : public WebDataConsumerHandle {
330 public:
331 explicit DestinationHandle(PassRefPtr<DestinationContext> context) : m_conte xt(context) { }
332 ~DestinationHandle() override
333 {
334 m_context->detachHandle();
335 }
336
337 private:
338 DestinationReader* obtainReaderInternal(Client* client) { return new Destina tionReader(m_context, client); }
339
340 RefPtr<DestinationContext> m_context;
341 };
342
343 // Bound to the created thread.
344 class SourceContext final : public GarbageCollectedFinalized<SourceContext>, pub lic ActiveDOMObject, public WebDataConsumerHandle::Client {
345 WILL_BE_USING_GARBAGE_COLLECTED_MIXIN(SourceContext);
346 public:
347 SourceContext(
348 PassRefPtr<TeeRootObject> root,
349 PassOwnPtr<WebDataConsumerHandle> src,
350 PassRefPtr<DestinationContext> dest1,
351 PassRefPtr<DestinationContext> dest2,
352 ExecutionContext* executionContext)
353 : ActiveDOMObject(executionContext)
354 , m_root(root)
355 , m_reader(src->obtainReader(this))
356 , m_dest1(dest1)
357 , m_dest2(dest2)
358 {
359 suspendIfNeeded();
360 }
361 ~SourceContext() override
362 {
363 stopInternal();
364 }
365
366 void didGetReadable() override
367 {
368 ASSERT(m_reader);
369 Result r = WebDataConsumerHandle::Ok;
370 while (true) {
371 const void* buffer = nullptr;
372 size_t available = 0;
373 r = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &a vailable);
374 if (r == WebDataConsumerHandle::ShouldWait)
375 return;
376 if (r != WebDataConsumerHandle::Ok)
377 break;
378 m_dest1->enqueue(static_cast<const char*>(buffer), available);
379 m_dest2->enqueue(static_cast<const char*>(buffer), available);
380 m_reader->endRead(available);
381 }
382 m_dest1->setResult(r);
383 m_dest2->setResult(r);
384 stopInternal();
385 }
386
387 void stop() override
388 {
389 stopInternal();
390 ActiveDOMObject::stop();
391 }
392
393 DEFINE_INLINE_VIRTUAL_TRACE()
394 {
395 ActiveDOMObject::trace(visitor);
396 }
397
398 private:
399 void stopInternal()
400 {
401 if (!m_root)
402 return;
403 // When we already set a result, this result setting will be ignored.
404 m_dest1->setResult(WebDataConsumerHandle::UnexpectedError);
405 m_dest2->setResult(WebDataConsumerHandle::UnexpectedError);
406 m_root->stop();
407 m_root = nullptr;
408 m_reader = nullptr;
409 m_dest1 = nullptr;
410 m_dest2 = nullptr;
411 }
412
413 RefPtr<TeeRootObject> m_root;
414 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
415 RefPtr<DestinationContext> m_dest1;
416 RefPtr<DestinationContext> m_dest2;
417 };
418
419 } // namespace
420
421 void DataConsumerTee::create(ExecutionContext* executionContext, PassOwnPtr<WebD ataConsumerHandle> src, OwnPtr<WebDataConsumerHandle>* dest1, OwnPtr<WebDataCons umerHandle>* dest2)
422 {
423 RefPtr<TeeRootObject> root = TeeRootObject::create();
424 RefPtr<DestinationTracker> tracker = DestinationTracker::create(root);
425 RefPtr<DestinationContext> context1 = DestinationContext::create(tracker, De stinationTracker::First);
426 RefPtr<DestinationContext> context2 = DestinationContext::create(tracker, De stinationTracker::Second);
427
428 root->initialize(new SourceContext(root, src, context1, context2, executionC ontext));
429
430 *dest1 = adoptPtr(new DestinationHandle(context1));
431 *dest2 = adoptPtr(new DestinationHandle(context2));
432 }
433
434 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698