Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(105)

Side by Side Diff: Source/modules/fetch/DataConsumerTeeTest.cpp

Issue 1171913003: **** [WIP] Blink-side: Implement FetchBlobDataConsumerHandle **** (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Clean up. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/DataConsumerTee.h"
7
8 #include "core/testing/DummyPageHolder.h"
9 #include "core/testing/NullExecutionContext.h"
10 #include "modules/fetch/DataConsumerHandleTestUtil.h"
11 #include "platform/Task.h"
12 #include "platform/ThreadSafeFunctional.h"
13 #include "platform/WebThreadSupportingGC.h"
14 #include "public/platform/Platform.h"
15 #include "public/platform/WebThread.h"
16 #include "public/platform/WebTraceLocation.h"
17 #include "public/platform/WebWaitableEvent.h"
18 #include "wtf/Deque.h"
19 #include "wtf/PassRefPtr.h"
20 #include "wtf/RefPtr.h"
21 #include "wtf/ThreadSafeRefCounted.h"
22 #include "wtf/ThreadingPrimitives.h"
23 #include "wtf/Vector.h"
24
25 #include <gtest/gtest.h>
26 #include <string.h>
27 #include <v8.h>
28
29 namespace blink {
30 namespace {
31
32 using Result = WebDataConsumerHandle::Result;
33 using TestingThread = DataConsumerHandleTestUtil::TestingThread;
34 const WebDataConsumerHandle::Flags kNone = WebDataConsumerHandle::FlagNone;
35 const Result kOk = WebDataConsumerHandle::Ok;
36 const Result kShouldWait = WebDataConsumerHandle::ShouldWait;
37 const Result kDone = WebDataConsumerHandle::Done;
38 const Result kUnexpectedError = WebDataConsumerHandle::UnexpectedError;
39
40 class Command final {
41 public:
42 enum Name {
43 Data,
44 Done,
45 Error,
46 Wait,
47 };
48
49 Command(Name name) : m_name(name) { }
50 Command(Name name, const Vector<char>& body) : m_name(name), m_body(body) { }
51 Command(Name name, const char* body, size_t size) : m_name(name)
52 {
53 m_body.append(body, size);
54 }
55 Command(Name name, const char* body) : Command(name, body, strlen(body)) { }
56 Name name() const { return m_name; }
57 const Vector<char>& body() const { return m_body; }
58
59 private:
60 const Name m_name;
61 Vector<char> m_body;
62 };
63
64 class Handle final : public WebDataConsumerHandle {
65 public:
66 class Context final : public ThreadSafeRefCounted<Context> {
67 public:
68 static PassRefPtr<Context> create() { return adoptRef(new Context); }
69
70 // This function cannot be called after creating a tee.
71 void add(const Command& command)
72 {
73 m_commands.append(command);
74 }
75
76 void notify()
77 {
78 if (!m_client)
79 return;
80 ASSERT(m_readerThread);
81 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context ::notifyInternal, this)));
82 }
83
84 void attachReader(WebDataConsumerHandle::Client* client)
85 {
86 ASSERT(!m_readerThread);
87 ASSERT(!m_client);
88 m_readerThread = Platform::current()->currentThread();
89 m_client = client;
90
91 if (m_client && !(isEmpty() && m_result == kShouldWait))
92 notify();
93 }
94 void detachReader()
95 {
96 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
97 m_readerThread = nullptr;
98 m_client = nullptr;
99 if (!m_isHandleAttached)
100 m_detachEvent->signal();
101 }
102
103 void detachHandle()
104 {
105 m_isHandleAttached = false;
106 if (!m_readerThread)
107 m_detachEvent->signal();
108 }
109
110 bool isEmpty() const { return m_commands.isEmpty(); }
111 const Command& top()
112 {
113 ASSERT(!isEmpty());
114 return m_commands.first();
115 }
116
117 void consume(size_t size)
118 {
119 ASSERT(!isEmpty());
120 ASSERT(size + m_offset <= top().body().size());
121 bool fullyConsumed = (size + m_offset == top().body().size());
122 if (fullyConsumed) {
123 m_offset = 0;
124 m_commands.removeFirst();
125 } else {
126 m_offset += size;
127 }
128 }
129
130 size_t offset() const { return m_offset; }
131 Result result() const { return m_result; }
132 void setDone() { m_result = Done; }
133 void setError() { m_result = UnexpectedError; }
134
135 Mutex& mutex()
136 {
137 return m_mutex;
138 }
139
140 WebWaitableEvent* detachEvent() { return m_detachEvent.get(); }
141
142 private:
143 Context()
144 : m_offset(0)
145 , m_readerThread(nullptr)
146 , m_client(nullptr)
147 , m_result(ShouldWait)
148 , m_isHandleAttached(true)
149 , m_detachEvent(adoptPtr(Platform::current()->createWaitableEvent()) )
150 {
151 }
152
153 void notifyInternal()
154 {
155 {
156 MutexLocker locker(m_mutex);
157 if (!m_client || !m_readerThread->isCurrentThread()) {
158 // There is no client, or a new reader is attached.
159 return;
160 }
161 }
162 // The reading thread is the current thread.
163 m_client->didGetReadable();
164 }
165
166 Deque<Command> m_commands;
167 size_t m_offset;
168 WebThread* m_readerThread;
169 Client* m_client;
170 Result m_result;
171 bool m_isHandleAttached;
172 Mutex m_mutex;
173 OwnPtr<WebWaitableEvent> m_detachEvent;
174 };
175
176 class ReaderImpl final : public Reader {
177 public:
178 ReaderImpl(PassRefPtr<Context> context, Client* client)
179 : m_context(context)
180 {
181 MutexLocker locker(m_context->mutex());
182 m_context->attachReader(client);
183 }
184 ~ReaderImpl()
185 {
186 MutexLocker locker(m_context->mutex());
187 m_context->detachReader();
188 }
189
190 Result read(void* buffer, size_t size, Flags flags, size_t* readSize) ov erride
191 {
192 const void* src = nullptr;
193 Result result = beginRead(&src, flags, readSize);
194 if (result != Ok)
195 return result;
196 memcpy(buffer, src, *readSize);
197 return endRead(*readSize);
198 }
199 Result beginRead(const void** buffer, Flags, size_t* available) override
200 {
201 MutexLocker locker(m_context->mutex());
202 *buffer = nullptr;
203 *available = 0;
204 if (m_context->isEmpty())
205 return m_context->result();
206
207 const Command& command = m_context->top();
208 Result result = Ok;
209 switch (command.name()) {
210 case Command::Data: {
211 auto& body = command.body();
212 *available = body.size() - m_context->offset();
213 *buffer = body.data() + m_context->offset();
214 result = Ok;
215 break;
216 }
217 case Command::Done:
218 m_context->setDone();
219 m_context->consume(0);
220 result = Done;
221 break;
222 case Command::Wait:
223 m_context->consume(0);
224 result = ShouldWait;
225 m_context->notify();
226 break;
227 case Command::Error:
228 m_context->setError();
229 m_context->consume(0);
230 result = UnexpectedError;
231 break;
232 }
233 return result;
234 }
235 Result endRead(size_t readSize) override
236 {
237 m_context->consume(readSize);
238 return Ok;
239 }
240
241 private:
242 RefPtr<Context> m_context;
243 };
244
245 Handle() : m_context(Context::create()) { }
246 ~Handle()
247 {
248 MutexLocker locker(m_context->mutex());
249 m_context->detachHandle();
250 }
251
252 ReaderImpl* obtainReaderInternal(Client* client) override { return new Reade rImpl(m_context, client); }
253
254 // Add a command to this handle. This function must be called on the
255 // creator thread. This function must be called BEFORE any reader is
256 // obtained.
257 void add(const Command& command)
258 {
259 MutexLocker locker(m_context->mutex());
260 m_context->add(command);
261 }
262
263 Context* context() { return m_context.get(); };
264
265 private:
266 RefPtr<Context> m_context;
267 };
268
269 class HandleReader : public WebDataConsumerHandle::Client {
270 public:
271 HandleReader() : m_finalResult(kOk) { }
272
273 // Need to wait for the event signal after this function is called.
274 void start(PassOwnPtr<WebDataConsumerHandle> handle)
275 {
276 m_thread = adoptPtr(new TestingThread("reading thread"));
277 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleR eader::obtainReader, AllowCrossThreadAccess(this), handle)));
278 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
279 }
280
281 void didGetReadable() override
282 {
283 Result r = kOk;
284 char buffer[3];
285 while (true) {
286 size_t size;
287 r = m_reader->read(buffer, sizeof(buffer), kNone, &size);
288 if (r == kShouldWait)
289 return;
290 if (r != kOk)
291 break;
292 m_readString.append(String(buffer, size));
293 }
294 m_finalResult = r;
295 m_reader = nullptr;
296 m_waitableEvent->signal();
297 }
298
299 WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); }
300
301 // These should be accessed after the thread joines.
302 const String& readString() const { return m_readString; }
303 Result finalResult() const { return m_finalResult; }
304
305 private:
306 void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle)
307 {
308 m_reader = handle->obtainReader(this);
309 }
310
311 OwnPtr<TestingThread> m_thread;
312 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
313 String m_readString;
314 Result m_finalResult;
315 OwnPtr<WebWaitableEvent> m_waitableEvent;
316 };
317
318 class HandleTwoPhaseReader : public WebDataConsumerHandle::Client {
319 public:
320 HandleTwoPhaseReader() : m_finalResult(kOk) { }
321
322 // Need to wait for the event signal after this function is called.
323 void start(PassOwnPtr<WebDataConsumerHandle> handle)
324 {
325 m_thread = adoptPtr(new TestingThread("reading thread"));
326 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&HandleT woPhaseReader::obtainReader, AllowCrossThreadAccess(this), handle)));
327 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
328 }
329
330 void didGetReadable() override
331 {
332 Result r = kOk;
333 while (true) {
334 const void* buffer = nullptr;
335 size_t size;
336 r = m_reader->beginRead(&buffer, kNone, &size);
337 if (r == kShouldWait)
338 return;
339 if (r != kOk)
340 break;
341 // Read smaller than availabe in order to test |endRead|.
342 size_t readSize = std::max(size * 2 / 3, static_cast<size_t>(1));
343 m_readString.append(String(static_cast<const char*>(buffer), readSiz e));
344 m_reader->endRead(readSize);
345 }
346 m_finalResult = r;
347 m_reader = nullptr;
348 m_waitableEvent->signal();
349 }
350
351 WebWaitableEvent* waitableEvent() { return m_waitableEvent.get(); }
352
353 // These should be accessed after the thread joines.
354 const String& readString() const { return m_readString; }
355 Result finalResult() const { return m_finalResult; }
356
357 private:
358 void obtainReader(PassOwnPtr<WebDataConsumerHandle> handle)
359 {
360 m_reader = handle->obtainReader(this);
361 }
362
363 OwnPtr<TestingThread> m_thread;
364 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
365 String m_readString;
366 Result m_finalResult;
367 OwnPtr<WebWaitableEvent> m_waitableEvent;
368 };
369
370 class TeeCreationThread {
371 public:
372 void run(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsumerHandle >* dest1, OwnPtr<WebDataConsumerHandle>* dest2)
373 {
374 m_thread = adoptPtr(new TestingThread("src thread", TestingThread::WithE xecutionContext));
375 m_waitableEvent = adoptPtr(Platform::current()->createWaitableEvent());
376 m_thread->thread()->postTask(FROM_HERE, new Task(threadSafeBind(&TeeCrea tionThread::runInternal, AllowCrossThreadAccess(this), src, AllowCrossThreadAcce ss(dest1), AllowCrossThreadAccess(dest2))));
377 m_waitableEvent->wait();
378 }
379
380 TestingThread* thread() { return m_thread.get(); }
381
382 private:
383 void runInternal(PassOwnPtr<WebDataConsumerHandle> src, OwnPtr<WebDataConsum erHandle>* dest1, OwnPtr<WebDataConsumerHandle>* dest2)
384 {
385 DataConsumerTee::create(m_thread->executionContext(), src, dest1, dest2) ;
386 m_waitableEvent->signal();
387 }
388
389 OwnPtr<TestingThread> m_thread;
390 OwnPtr<WebWaitableEvent> m_waitableEvent;
391 };
392
393 TEST(DataConsumerTeeTest, CreateDone)
394 {
395 OwnPtr<Handle> src(adoptPtr(new Handle));
396 OwnPtr<WebDataConsumerHandle> dest1, dest2;
397
398 src->add(Command(Command::Done));
399
400 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
401 t->run(src.release(), &dest1, &dest2);
402
403 ASSERT_TRUE(dest1);
404 ASSERT_TRUE(dest2);
405
406 HandleReader r1, r2;
407 r1.start(dest1.release());
408 r2.start(dest2.release());
409
410 r1.waitableEvent()->wait();
411 r2.waitableEvent()->wait();
412
413 EXPECT_EQ(kDone, r1.finalResult());
414 EXPECT_EQ(String(), r1.readString());
415
416 EXPECT_EQ(kDone, r2.finalResult());
417 EXPECT_EQ(String(), r2.readString());
418 }
419
420 TEST(DataConsumerTeeTest, Read)
421 {
422 OwnPtr<Handle> src(adoptPtr(new Handle));
423 OwnPtr<WebDataConsumerHandle> dest1, dest2;
424
425 src->add(Command(Command::Wait));
426 src->add(Command(Command::Data, "hello, "));
427 src->add(Command(Command::Wait));
428 src->add(Command(Command::Data, "world"));
429 src->add(Command(Command::Wait));
430 src->add(Command(Command::Wait));
431 src->add(Command(Command::Done));
432
433 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
434 t->run(src.release(), &dest1, &dest2);
435
436 ASSERT_TRUE(dest1);
437 ASSERT_TRUE(dest2);
438
439 HandleReader r1, r2;
440 r1.start(dest1.release());
441 r2.start(dest2.release());
442
443 r1.waitableEvent()->wait();
444 r2.waitableEvent()->wait();
445
446 EXPECT_EQ(kDone, r1.finalResult());
447 EXPECT_EQ("hello, world", r1.readString());
448
449 EXPECT_EQ(kDone, r2.finalResult());
450 EXPECT_EQ("hello, world", r2.readString());
451 }
452
453 TEST(DataConsumerTeeTest, TwoPhaseRead)
454 {
455 OwnPtr<Handle> src(adoptPtr(new Handle));
456 OwnPtr<WebDataConsumerHandle> dest1, dest2;
457
458 src->add(Command(Command::Wait));
459 src->add(Command(Command::Data, "hello, "));
460 src->add(Command(Command::Wait));
461 src->add(Command(Command::Wait));
462 src->add(Command(Command::Wait));
463 src->add(Command(Command::Data, "world"));
464 src->add(Command(Command::Wait));
465 src->add(Command(Command::Done));
466
467 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
468 t->run(src.release(), &dest1, &dest2);
469
470 ASSERT_TRUE(dest1);
471 ASSERT_TRUE(dest2);
472
473 HandleTwoPhaseReader r1, r2;
474 r1.start(dest1.release());
475 r2.start(dest2.release());
476
477 r1.waitableEvent()->wait();
478 r2.waitableEvent()->wait();
479
480 EXPECT_EQ(kDone, r1.finalResult());
481 EXPECT_EQ("hello, world", r1.readString());
482
483 EXPECT_EQ(kDone, r2.finalResult());
484 EXPECT_EQ("hello, world", r2.readString());
485 }
486
487 TEST(DataConsumerTeeTest, Error)
488 {
489 OwnPtr<Handle> src(adoptPtr(new Handle));
490 OwnPtr<WebDataConsumerHandle> dest1, dest2;
491
492 src->add(Command(Command::Data, "hello, "));
493 src->add(Command(Command::Data, "world"));
494 src->add(Command(Command::Error));
495
496 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
497 t->run(src.release(), &dest1, &dest2);
498
499 ASSERT_TRUE(dest1);
500 ASSERT_TRUE(dest2);
501
502 HandleReader r1, r2;
503 r1.start(dest1.release());
504 r2.start(dest2.release());
505
506 r1.waitableEvent()->wait();
507 r2.waitableEvent()->wait();
508
509 EXPECT_EQ(kUnexpectedError, r1.finalResult());
510 EXPECT_EQ(kUnexpectedError, r2.finalResult());
511 }
512
513 TEST(DataConsumerTeeTest, DetachSource)
514 {
515 OwnPtr<Handle> src(adoptPtr(new Handle));
516 OwnPtr<WebDataConsumerHandle> dest1, dest2;
517
518 src->add(Command(Command::Data, "hello, "));
519 src->add(Command(Command::Data, "world"));
520
521 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
522 t->run(src.release(), &dest1, &dest2);
523
524 ASSERT_TRUE(dest1);
525 ASSERT_TRUE(dest2);
526
527 HandleReader r1, r2;
528 r1.start(dest1.release());
529 r2.start(dest2.release());
530
531 t = nullptr;
532
533 r1.waitableEvent()->wait();
534 r2.waitableEvent()->wait();
535
536 EXPECT_EQ(kUnexpectedError, r1.finalResult());
537 EXPECT_EQ(kUnexpectedError, r2.finalResult());
538 }
539
540 TEST(DataConsumerTeeTest, DetachSourceAfterReadingDone)
541 {
542 OwnPtr<Handle> src(adoptPtr(new Handle));
543 OwnPtr<WebDataConsumerHandle> dest1, dest2;
544
545 src->add(Command(Command::Data, "hello, "));
546 src->add(Command(Command::Data, "world"));
547 src->add(Command(Command::Done));
548
549 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
550 t->run(src.release(), &dest1, &dest2);
551
552 ASSERT_TRUE(dest1);
553 ASSERT_TRUE(dest2);
554
555 HandleReader r1, r2;
556 r1.start(dest1.release());
557 r1.waitableEvent()->wait();
558
559 EXPECT_EQ(kDone, r1.finalResult());
560 EXPECT_EQ("hello, world", r1.readString());
561
562 t = nullptr;
563
564 r2.start(dest2.release());
565 r2.waitableEvent()->wait();
566
567 EXPECT_EQ(kDone, r2.finalResult());
568 EXPECT_EQ("hello, world", r2.readString());
569 }
570
571 TEST(DataConsumerTeeTest, DetachOneDestination)
572 {
573 OwnPtr<Handle> src(adoptPtr(new Handle));
574 OwnPtr<WebDataConsumerHandle> dest1, dest2;
575
576 src->add(Command(Command::Data, "hello, "));
577 src->add(Command(Command::Data, "world"));
578 src->add(Command(Command::Done));
579
580 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
581 t->run(src.release(), &dest1, &dest2);
582
583 ASSERT_TRUE(dest1);
584 ASSERT_TRUE(dest2);
585
586 dest1 = nullptr;
587
588 HandleReader r2;
589 r2.start(dest2.release());
590 r2.waitableEvent()->wait();
591
592 EXPECT_EQ(kDone, r2.finalResult());
593 EXPECT_EQ("hello, world", r2.readString());
594 }
595
596 TEST(DataConsumerTeeTest, DetachBothDestinationsShouldStopSourceReader)
597 {
598 OwnPtr<Handle> src(adoptPtr(new Handle));
599 RefPtr<Handle::Context> context(src->context());
600 OwnPtr<WebDataConsumerHandle> dest1, dest2;
601
602 src->add(Command(Command::Data, "hello, "));
603 src->add(Command(Command::Data, "world"));
604
605 OwnPtr<TeeCreationThread> t = adoptPtr(new TeeCreationThread());
606 t->run(src.release(), &dest1, &dest2);
607
608 ASSERT_TRUE(dest1);
609 ASSERT_TRUE(dest2);
610
611 dest1 = nullptr;
612 dest2 = nullptr;
613
614 // Collect garbage to finalize the source reader.
615 Heap::collectGarbage(ThreadState::HeapPointersOnStack, ThreadState::GCWithSw eep, Heap::ForcedGC);
616 context->detachEvent()->wait();
617 }
618
619 } // namespace
620 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/DataConsumerTee.cpp ('k') | Source/modules/fetch/FetchBlobDataConsumerHandle.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698