Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(247)

Side by Side Diff: Source/modules/fetch/DataConsumerTeeTest.cpp

Issue 1195563002: Introduce DataConsumerHandleTee (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.cpp ('k') | Source/modules/modules.gypi » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/testing/DummyPageHolder.h"
9 #include "core/testing/NullExecutionContext.h"
10 #include "platform/Task.h"
11 #include "platform/ThreadSafeFunctional.h"
12 #include "platform/WebThreadSupportingGC.h"
13 #include "public/platform/Platform.h"
14 #include "public/platform/WebThread.h"
15 #include "public/platform/WebTraceLocation.h"
16 #include "public/platform/WebWaitableEvent.h"
17 #include "wtf/Deque.h"
18 #include "wtf/PassRefPtr.h"
19 #include "wtf/RefPtr.h"
20 #include "wtf/ThreadSafeRefCounted.h"
21 #include "wtf/ThreadingPrimitives.h"
22 #include "wtf/Vector.h"
23
24 #include <gtest/gtest.h>
25 #include <string.h>
26 #include <v8.h>
27
28 namespace blink {
29 namespace {
30
31 using Result = WebDataConsumerHandle::Result;
32 const WebDataConsumerHandle::Flags kNone = WebDataConsumerHandle::FlagNone;
33 const Result kOk = WebDataConsumerHandle::Ok;
34 const Result kShouldWait = WebDataConsumerHandle::ShouldWait;
35 const Result kDone = WebDataConsumerHandle::Done;
36 const Result kUnexpectedError = WebDataConsumerHandle::UnexpectedError;
37
38 class Command final {
39 public:
40 enum Name {
41 Data,
42 Done,
43 Error,
44 Wait,
45 };
46
47 Command(Name name) : m_name(name) { }
48 Command(Name name, const Vector<char>& body) : m_name(name), m_body(body) { }
49 Command(Name name, const char* body, size_t size) : m_name(name)
50 {
51 m_body.append(body, size);
52 }
53 Command(Name name, const char* body) : Command(name, body, strlen(body)) { }
54 Name name() const { return m_name; }
55 const Vector<char>& body() const { return m_body; }
56
57 private:
58 const Name m_name;
59 Vector<char> m_body;
60 };
61
62 // Handle stores commands via |add| and replays the stored commends when read.
63 class Handle final : public WebDataConsumerHandle {
64 public:
65 class Context final : public ThreadSafeRefCounted<Context> {
66 public:
67 static PassRefPtr<Context> create() { return adoptRef(new Context); }
68
69 // This function cannot be called after creating a tee.
70 void add(const Command& command)
71 {
72 MutexLocker locker(m_mutex);
73 m_commands.append(command);
74 }
75
76 void attachReader(WebDataConsumerHandle::Client* client)
77 {
78 MutexLocker locker(m_mutex);
79 ASSERT(!m_readerThread);
80 ASSERT(!m_client);
81 m_readerThread = Platform::current()->currentThread();
82 m_client = client;
83
84 if (m_client && !(isEmpty() && m_result == kShouldWait))
85 notify();
86 }
87 void detachReader()
88 {
89 MutexLocker locker(m_mutex);
90 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
91 m_readerThread = nullptr;
92 m_client = nullptr;
93 if (!m_isHandleAttached)
94 m_detached->signal();
95 }
96
97 void detachHandle()
98 {
99 MutexLocker locker(m_mutex);
100 m_isHandleAttached = false;
101 if (!m_readerThread)
102 m_detached->signal();
103 }
104
105 Result beginRead(const void** buffer, Flags, size_t* available)
106 {
107 MutexLocker locker(m_mutex);
108 *buffer = nullptr;
109 *available = 0;
110 if (isEmpty())
111 return m_result;
112
113 const Command& command = top();
114 Result result = Ok;
115 switch (command.name()) {
116 case Command::Data: {
117 auto& body = command.body();
118 *available = body.size() - offset();
119 *buffer = body.data() + offset();
120 result = Ok;
121 break;
122 }
123 case Command::Done:
124 m_result = result = Done;
125 consume(0);
126 break;
127 case Command::Wait:
128 consume(0);
129 result = ShouldWait;
130 notify();
131 break;
132 case Command::Error:
133 m_result = result = UnexpectedError;
134 consume(0);
135 break;
136 }
137 return result;
138 }
139 Result endRead(size_t readSize)
140 {
141 MutexLocker locker(m_mutex);
142 consume(readSize);
143 return Ok;
144 }
145
146 WebWaitableEvent* detached() { return m_detached.get(); }
147
148 private:
149 Context()
150 : m_offset(0)
151 , m_readerThread(nullptr)
152 , m_client(nullptr)
153 , m_result(ShouldWait)
154 , m_isHandleAttached(true)
155 , m_detached(adoptPtr(Platform::current()->createWaitableEvent()))
156 {
157 }
158
159 bool isEmpty() const { return m_commands.isEmpty(); }
160 const Command& top()
161 {
162 ASSERT(!isEmpty());
163 return m_commands.first();
164 }
165
166 void consume(size_t size)
167 {
168 ASSERT(!isEmpty());
169 ASSERT(size + m_offset <= top().body().size());
170 bool fullyConsumed = (size + m_offset >= top().body().size());
171 if (fullyConsumed) {
172 m_offset = 0;
173 m_commands.removeFirst();
174 } else {
175 m_offset += size;
176 }
177 }
178
179 size_t offset() const { return m_offset; }
180
181 void notify()
182 {
183 if (!m_client)
184 return;
185 ASSERT(m_readerThread);
186 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context ::notifyInternal, this)));
187 }
188
189 void notifyInternal()
190 {
191 {
192 MutexLocker locker(m_mutex);
193 if (!m_client || !m_readerThread->isCurrentThread()) {
194 // There is no client, or a new reader is attached.
195 return;
196 }
197 }
198 // The reading thread is the current thread.
199 m_client->didGetReadable();
200 }
201
202 Deque<Command> m_commands;
203 size_t m_offset;
204 WebThread* m_readerThread;
205 Client* m_client;
206 Result m_result;
207 bool m_isHandleAttached;
208 Mutex m_mutex;
209 OwnPtr<WebWaitableEvent> m_detached;
210 };
211
212 class ReaderImpl final : public Reader {
213 public:
214 ReaderImpl(PassRefPtr<Context> context, Client* client)
215 : m_context(context)
216 {
217 m_context->attachReader(client);
218 }
219 ~ReaderImpl()
220 {
221 m_context->detachReader();
222 }
223
224 Result read(void* buffer, size_t size, Flags flags, size_t* readSize) ov erride
225 {
226 const void* src = nullptr;
227 Result result = beginRead(&src, flags, readSize);
228 if (result != Ok)
229 return result;
230 *readSize = std::min(*readSize, size);
231 memcpy(buffer, src, *readSize);
232 return endRead(*readSize);
233 }
234 Result beginRead(const void** buffer, Flags flags, size_t* available) ov erride
235 {
236 return m_context->beginRead(buffer, flags, available);
237 }
238 Result endRead(size_t readSize) override
239 {
240 return m_context->endRead(readSize);
241 }
242
243 private:
244 RefPtr<Context> m_context;
245 };
246
247 Handle() : m_context(Context::create()) { }
248 ~Handle()
249 {
250 m_context->detachHandle();
251 }
252
253 ReaderImpl* obtainReaderInternal(Client* client) override { return new Reade rImpl(m_context, client); }
254
255 // Add a command to this handle. This function must be called on the
256 // creator thread. This function must be called BEFORE any reader is
257 // obtained.
258 void add(const Command& command)
259 {
260 m_context->add(command);
261 }
262
263 Context* context() { return m_context.get(); };
264
265 private:
266 RefPtr<Context> m_context;
267 };
268
269 class TestingThread final {
270 public:
271 explicit TestingThread(const char* name)
272 : m_thread(WebThreadSupportingGC::create(name))
273 , m_waitableEvent(adoptPtr(Platform::current()->createWaitableEvent()))
274 , m_isolate(nullptr)
275 {
276 m_thread->postTask(FROM_HERE, new Task(threadSafeBind(&TestingThread::in itialize, AllowCrossThreadAccess(this))));
277 m_waitableEvent->wait();
278 }
279
280 ~TestingThread()
281 {
282 m_thread->postTask(FROM_HERE, new Task(threadSafeBind(&TestingThread::sh utdown, AllowCrossThreadAccess(this))));
283 m_waitableEvent->wait();
284 }
285
286 WebThreadSupportingGC* thread() { return m_thread.get(); }
287 ExecutionContext* executionContext() { return m_executionContext.get(); }
288
289 private:
290 void initialize()
291 {
292 m_isolate = v8::Isolate::New(v8::Isolate::CreateParams());
haraken 2015/06/24 03:44:26 Maybe would it be better to create a ScriptState b
yhirano 2015/06/24 06:55:33 Done. I added dependency to "gin/public" from "mod
293 m_isolate->Enter();
294 m_thread->initialize();
295 m_executionContext = adoptRefWillBeNoop(new NullExecutionContext());
haraken 2015/06/24 03:44:26 Using NullExecutionContext is OK if you don't need
yhirano 2015/06/24 06:55:33 Yeah, I only need to verify stop notification.
296 m_waitableEvent->signal();
297 }
298
299 void shutdown()
300 {
301 m_executionContext = nullptr;
302 m_thread->shutdown();
303 m_isolate->Exit();
304 m_isolate->Dispose();
305 m_isolate = nullptr;
306 m_waitableEvent->signal();
307 }
308
309 OwnPtr<WebThreadSupportingGC> m_thread;
310 OwnPtr<WebWaitableEvent> m_waitableEvent;
311 RefPtrWillBePersistent<NullExecutionContext> m_executionContext;
312 v8::Isolate* m_isolate;
313 };
314
315 class HandleReader : public WebDataConsumerHandle::Client {
316 public:
317 HandleReader() : m_finalResult(kOk) { }
318
319 // Need to wait for the event signal after this function is called.
320 void start(PassOwnPtr<WebDataConsumerHandle> handle)
321 {
322 m_thread = adoptPtr(new TestingThread("reading thread"));
323 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
324 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleR eader::obtainReader, AllowCrossThreadAccess(this), handle)));
325 }
326
327 void didGetReadable() override
328 {
329 Result r = kOk;
330 char buffer[3];
331 while (true) {
332 size_t size;
333 r = m_reader->read(buffer, sizeof(buffer), kNone, &size);
334 if (r == kShouldWait)
335 return;
336 if (r != kOk)
337 break;
338 m_readString.append(String(buffer, size));
339 }
340 m_finalResult = r;
341 m_reader = nullptr;
342 m_waitableEvent->signal();
343 }
344
345 WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); }
346
347 // These should be accessed after the thread joins.
348 const String& readString() const { return m_readString; }
349 Result finalResult() const { return m_finalResult; }
350
351 private:
352 void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle)
353 {
354 m_reader = handle->obtainReader(this);
355 }
356
357 OwnPtr<TestingThread> m_thread;
358 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
359 String m_readString;
360 Result m_finalResult;
361 OwnPtr<WebWaitableEvent> m_waitableEvent;
362 };
363
364 class HandleTwoPhaseReader : public WebDataConsumerHandle::Client {
365 public:
366 HandleTwoPhaseReader() : m_finalResult(kOk) { }
367
368 // Need to wait for the event signal after this function is called.
369 void start(PassOwnPtr<WebDataConsumerHandle> handle)
370 {
371 m_thread = adoptPtr(new TestingThread("reading thread"));
372 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
373 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleT woPhaseReader::obtainReader, AllowCrossThreadAccess(this), handle)));
374 }
375
376 void didGetReadable() override
377 {
378 Result r = kOk;
379 while (true) {
380 const void* buffer = nullptr;
381 size_t size;
382 r = m_reader->beginRead(&buffer, kNone, &size);
383 if (r == kShouldWait)
384 return;
385 if (r != kOk)
386 break;
387 // Read smaller than availabe in order to test |endRead|.
388 size_t readSize = std::max(size * 2 / 3, static_cast<size_t>(1));
389 m_readString.append(String(static_cast<const char*>(buffer), readSiz e));
390 m_reader->endRead(readSize);
391 }
392 m_finalResult = r;
393 m_reader = nullptr;
394 m_waitableEvent->signal();
395 }
396
397 WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); }
398
399 // These should be accessed after the thread joins.
400 const String& readString() const { return m_readString; }
401 Result finalResult() const { return m_finalResult; }
402
403 private:
404 void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle)
405 {
406 m_reader = handle->obtainReader(this);
407 }
408
409 OwnPtr<TestingThread> m_thread;
410 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
411 String m_readString;
412 Result m_finalResult;
413 OwnPtr<WebWaitableEvent> m_waitableEvent;
414 };
415
416 class TeeCreationThread {
417 public:
418 void run(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsumerHandle >* dest1, OwnPtr<WebDataConsumerHandle>* dest2)
419 {
420 m_thread = adoptPtr(new TestingThread("src thread"));
421 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
422 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&TeeCrea tionThread::runInternal, AllowCrossThreadAccess(this), src, AllowCrossThreadAcce ss(dest1), AllowCrossThreadAccess(dest2))));
423 m_waitableEvent->wait();
424 }
425
426 TestingThread* thread() { return m_thread.get(); }
427
428 private:
429 void runInternal(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsum erHandle>* dest1, OwnPtr<WebDataConsumerHandle>* dest2)
430 {
431 DataConsumerTee::create(m_thread->executionContext(), src, dest1, dest2) ;
432 m_waitableEvent->signal();
433 }
434
435 OwnPtr<TestingThread> m_thread;
436 OwnPtr<WebWaitableEvent> m_waitableEvent;
437 };
438
439 TEST(DataConsumerTeeTest, CreateDone)
440 {
441 OwnPtr<Handle> src(adoptPtr(new Handle));
442 OwnPtr<WebDataConsumerHandle> dest1, dest2;
443
444 src->add(Command(Command::Done));
445
446 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
447 t->run(src.release(), &dest1, &dest2);
448
449 ASSERT_TRUE(dest1);
450 ASSERT_TRUE(dest2);
451
452 HandleReader r1, r2;
453 r1.start(dest1.release());
454 r2.start(dest2.release());
455
456 r1.waitableEvent()->wait();
457 r2.waitableEvent()->wait();
458
459 EXPECT_EQ(kDone, r1.finalResult());
460 EXPECT_EQ(String(), r1.readString());
461
462 EXPECT_EQ(kDone, r2.finalResult());
463 EXPECT_EQ(String(), r2.readString());
464 }
465
466 TEST(DataConsumerTeeTest, Read)
467 {
468 OwnPtr<Handle> src(adoptPtr(new Handle));
469 OwnPtr<WebDataConsumerHandle> dest1, dest2;
470
471 src->add(Command(Command::Wait));
472 src->add(Command(Command::Data, "hello, "));
473 src->add(Command(Command::Wait));
474 src->add(Command(Command::Data, "world"));
475 src->add(Command(Command::Wait));
476 src->add(Command(Command::Wait));
477 src->add(Command(Command::Done));
478
479 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
480 t->run(src.release(), &dest1, &dest2);
481
482 ASSERT_TRUE(dest1);
483 ASSERT_TRUE(dest2);
484
485 HandleReader r1, r2;
486 r1.start(dest1.release());
487 r2.start(dest2.release());
488
489 r1.waitableEvent()->wait();
490 r2.waitableEvent()->wait();
491
492 EXPECT_EQ(kDone, r1.finalResult());
493 EXPECT_EQ("hello, world", r1.readString());
494
495 EXPECT_EQ(kDone, r2.finalResult());
496 EXPECT_EQ("hello, world", r2.readString());
497 }
498
499 TEST(DataConsumerTeeTest, TwoPhaseRead)
500 {
501 OwnPtr<Handle> src(adoptPtr(new Handle));
502 OwnPtr<WebDataConsumerHandle> dest1, dest2;
503
504 src->add(Command(Command::Wait));
505 src->add(Command(Command::Data, "hello, "));
506 src->add(Command(Command::Wait));
507 src->add(Command(Command::Wait));
508 src->add(Command(Command::Wait));
509 src->add(Command(Command::Data, "world"));
510 src->add(Command(Command::Wait));
511 src->add(Command(Command::Done));
512
513 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
514 t->run(src.release(), &dest1, &dest2);
515
516 ASSERT_TRUE(dest1);
517 ASSERT_TRUE(dest2);
518
519 HandleTwoPhaseReader r1, r2;
520 r1.start(dest1.release());
521 r2.start(dest2.release());
522
523 r1.waitableEvent()->wait();
524 r2.waitableEvent()->wait();
525
526 EXPECT_EQ(kDone, r1.finalResult());
527 EXPECT_EQ("hello, world", r1.readString());
528
529 EXPECT_EQ(kDone, r2.finalResult());
530 EXPECT_EQ("hello, world", r2.readString());
531 }
532
533 TEST(DataConsumerTeeTest, Error)
534 {
535 OwnPtr<Handle> src(adoptPtr(new Handle));
536 OwnPtr<WebDataConsumerHandle> dest1, dest2;
537
538 src->add(Command(Command::Data, "hello, "));
539 src->add(Command(Command::Data, "world"));
540 src->add(Command(Command::Error));
541
542 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
543 t->run(src.release(), &dest1, &dest2);
544
545 ASSERT_TRUE(dest1);
546 ASSERT_TRUE(dest2);
547
548 HandleReader r1, r2;
549 r1.start(dest1.release());
550 r2.start(dest2.release());
551
552 r1.waitableEvent()->wait();
553 r2.waitableEvent()->wait();
554
555 EXPECT_EQ(kUnexpectedError, r1.finalResult());
556 EXPECT_EQ(kUnexpectedError, r2.finalResult());
557 }
558
559 TEST(DataConsumerTeeTest, DetachSource)
560 {
561 OwnPtr<Handle> src(adoptPtr(new Handle));
562 OwnPtr<WebDataConsumerHandle> dest1, dest2;
563
564 src->add(Command(Command::Data, "hello, "));
565 src->add(Command(Command::Data, "world"));
566
567 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
568 t->run(src.release(), &dest1, &dest2);
569
570 ASSERT_TRUE(dest1);
571 ASSERT_TRUE(dest2);
572
573 HandleReader r1, r2;
574 r1.start(dest1.release());
575 r2.start(dest2.release());
576
577 t = nullptr;
578
579 r1.waitableEvent()->wait();
580 r2.waitableEvent()->wait();
581
582 EXPECT_EQ(kUnexpectedError, r1.finalResult());
583 EXPECT_EQ(kUnexpectedError, r2.finalResult());
584 }
585
586 TEST(DataConsumerTeeTest, DetachSourceAfterReadingDone)
587 {
588 OwnPtr<Handle> src(adoptPtr(new Handle));
589 OwnPtr<WebDataConsumerHandle> dest1, dest2;
590
591 src->add(Command(Command::Data, "hello, "));
592 src->add(Command(Command::Data, "world"));
593 src->add(Command(Command::Done));
594
595 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
596 t->run(src.release(), &dest1, &dest2);
597
598 ASSERT_TRUE(dest1);
599 ASSERT_TRUE(dest2);
600
601 HandleReader r1, r2;
602 r1.start(dest1.release());
603 r1.waitableEvent()->wait();
604
605 EXPECT_EQ(kDone, r1.finalResult());
606 EXPECT_EQ("hello, world", r1.readString());
607
608 t = nullptr;
609
610 r2.start(dest2.release());
611 r2.waitableEvent()->wait();
612
613 EXPECT_EQ(kDone, r2.finalResult());
614 EXPECT_EQ("hello, world", r2.readString());
615 }
616
617 TEST(DataConsumerTeeTest, DetachOneDestination)
618 {
619 OwnPtr<Handle> src(adoptPtr(new Handle));
620 OwnPtr<WebDataConsumerHandle> dest1, dest2;
621
622 src->add(Command(Command::Data, "hello, "));
623 src->add(Command(Command::Data, "world"));
624 src->add(Command(Command::Done));
625
626 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
627 t->run(src.release(), &dest1, &dest2);
628
629 ASSERT_TRUE(dest1);
630 ASSERT_TRUE(dest2);
631
632 dest1 = nullptr;
633
634 HandleReader r2;
635 r2.start(dest2.release());
636 r2.waitableEvent()->wait();
637
638 EXPECT_EQ(kDone, r2.finalResult());
639 EXPECT_EQ("hello, world", r2.readString());
640 }
641
642 TEST(DataConsumerTeeTest, DetachBothDestinationsShouldStopSourceReader)
643 {
644 OwnPtr<Handle> src(adoptPtr(new Handle));
645 RefPtr<Handle::Context> context(src->context());
646 OwnPtr<WebDataConsumerHandle> dest1, dest2;
647
648 src->add(Command(Command::Data, "hello, "));
649 src->add(Command(Command::Data, "world"));
650
651 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
652 t->run(src.release(), &dest1, &dest2);
653
654 ASSERT_TRUE(dest1);
655 ASSERT_TRUE(dest2);
656
657 dest1 = nullptr;
658 dest2 = nullptr;
659
660 // Collect garbage to finalize the source reader.
661 Heap::collectGarbage(ThreadState::HeapPointersOnStack, ThreadState::GCWithSw eep, Heap::ForcedGC);
haraken 2015/06/24 03:44:26 You can call Heap::collectAllGarbage(). ThreadStat
yhirano 2015/06/24 06:55:33 Done.
662 context->detached()->wait();
663 }
664
665 } // namespace
666 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.cpp ('k') | Source/modules/modules.gypi » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698