Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(106)

Side by Side Diff: Source/modules/fetch/DataConsumerHandleTestUtil.cpp

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/DataConsumerHandleTestUtil.h" 6 #include "modules/fetch/DataConsumerHandleTestUtil.h"
7 7
8 #include "bindings/core/v8/DOMWrapperWorld.h" 8 #include "bindings/core/v8/DOMWrapperWorld.h"
9 9
10 namespace blink { 10 namespace blink {
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
46 m_executionContext = nullptr; 46 m_executionContext = nullptr;
47 m_scriptState = nullptr; 47 m_scriptState = nullptr;
48 m_thread->shutdown(); 48 m_thread->shutdown();
49 if (m_isolateHolder) { 49 if (m_isolateHolder) {
50 isolate()->Exit(); 50 isolate()->Exit();
51 m_isolateHolder = nullptr; 51 m_isolateHolder = nullptr;
52 } 52 }
53 m_waitableEvent->signal(); 53 m_waitableEvent->signal();
54 } 54 }
55 55
56 class DataConsumerHandleTestUtil::ReplayingHandle::Context final : public Thread SafeRefCounted<Context> {
57 public:
58 static PassRefPtr<Context> create() { return adoptRef(new Context); }
59
60 // This function cannot be called after creating a tee.
61 void add(const Command& command)
62 {
63 MutexLocker locker(m_mutex);
64 m_commands.append(command);
65 }
66
67 void attachReader(WebDataConsumerHandle::Client* client)
68 {
69 MutexLocker locker(m_mutex);
70 ASSERT(!m_readerThread);
71 ASSERT(!m_client);
72 m_readerThread = Platform::current()->currentThread();
73 m_client = client;
74
75 if (m_client && !(isEmpty() && m_result == ShouldWait))
76 notify();
77 }
78 void detachReader()
79 {
80 MutexLocker locker(m_mutex);
81 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
82 m_readerThread = nullptr;
83 m_client = nullptr;
84 if (!m_isHandleAttached)
85 m_detached->signal();
86 }
87
88 void detachHandle()
89 {
90 MutexLocker locker(m_mutex);
91 m_isHandleAttached = false;
92 if (!m_readerThread)
93 m_detached->signal();
94 }
95
96 Result beginRead(const void** buffer, Flags, size_t* available)
97 {
98 MutexLocker locker(m_mutex);
99 *buffer = nullptr;
100 *available = 0;
101 if (isEmpty())
102 return m_result;
103
104 const Command& command = top();
105 Result result = Ok;
106 switch (command.name()) {
107 case Command::Data: {
108 auto& body = command.body();
109 *available = body.size() - offset();
110 *buffer = body.data() + offset();
111 result = Ok;
112 break;
113 }
114 case Command::Done:
115 m_result = result = Done;
116 consume(0);
117 break;
118 case Command::Wait:
119 consume(0);
120 result = ShouldWait;
121 notify();
122 break;
123 case Command::Error:
124 m_result = result = UnexpectedError;
125 consume(0);
126 break;
127 }
128 return result;
129 }
130 Result endRead(size_t readSize)
131 {
132 MutexLocker locker(m_mutex);
133 consume(readSize);
134 return Ok;
135 }
136
137 WebWaitableEvent* detached() { return m_detached.get(); }
138
139 private:
140 Context()
141 : m_offset(0)
142 , m_readerThread(nullptr)
143 , m_client(nullptr)
144 , m_result(ShouldWait)
145 , m_isHandleAttached(true)
146 , m_detached(adoptPtr(Platform::current()->createWaitableEvent()))
147 {
148 }
149
150 bool isEmpty() const { return m_commands.isEmpty(); }
151 const Command& top()
152 {
153 ASSERT(!isEmpty());
154 return m_commands.first();
155 }
156
157 void consume(size_t size)
158 {
159 ASSERT(!isEmpty());
160 ASSERT(size + m_offset <= top().body().size());
161 bool fullyConsumed = (size + m_offset >= top().body().size());
162 if (fullyConsumed) {
163 m_offset = 0;
164 m_commands.removeFirst();
165 } else {
166 m_offset += size;
167 }
168 }
169
170 size_t offset() const { return m_offset; }
171
172 void notify()
173 {
174 if (!m_client)
175 return;
176 ASSERT(m_readerThread);
177 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::no tifyInternal, this)));
178 }
179
180 void notifyInternal()
181 {
182 {
183 MutexLocker locker(m_mutex);
184 if (!m_client || !m_readerThread->isCurrentThread()) {
185 // There is no client, or a new reader is attached.
186 return;
187 }
188 }
189 // The reading thread is the current thread.
190 m_client->didGetReadable();
191 }
192
193 Deque<Command> m_commands;
194 size_t m_offset;
195 WebThread* m_readerThread;
196 Client* m_client;
197 Result m_result;
198 bool m_isHandleAttached;
199 Mutex m_mutex;
200 OwnPtr<WebWaitableEvent> m_detached;
201 };
202
203 class DataConsumerHandleTestUtil::ReplayingHandle::ReaderImpl final : public Rea der {
204 public:
205 ReaderImpl(PassRefPtr<Context> context, Client* client)
206 : m_context(context)
207 {
208 m_context->attachReader(client);
209 }
210 ~ReaderImpl()
211 {
212 m_context->detachReader();
213 }
214
215 Result read(void* buffer, size_t size, Flags flags, size_t* readSize) overri de
216 {
217 const void* src = nullptr;
218 Result result = beginRead(&src, flags, readSize);
219 if (result != Ok)
220 return result;
221 *readSize = std::min(*readSize, size);
222 memcpy(buffer, src, *readSize);
223 return endRead(*readSize);
224 }
225 Result beginRead(const void** buffer, Flags flags, size_t* available) overri de
226 {
227 return m_context->beginRead(buffer, flags, available);
228 }
229 Result endRead(size_t readSize) override
230 {
231 return m_context->endRead(readSize);
232 }
233
234 private:
235 RefPtr<Context> m_context;
236 };
237
238 DataConsumerHandleTestUtil::ReplayingHandle::ReplayingHandle() : m_context(Conte xt::create())
239 {
240 }
241
242 DataConsumerHandleTestUtil::ReplayingHandle::~ReplayingHandle()
243 {
244 m_context->detachHandle();
245 }
246
247 WebDataConsumerHandle::Reader* DataConsumerHandleTestUtil::ReplayingHandle::obta inReaderInternal(Client* client)
248 {
249 return new ReaderImpl(m_context, client);
250 }
251
252 void DataConsumerHandleTestUtil::ReplayingHandle::add(const Command& command)
253 {
254 m_context->add(command);
255 }
256
56 } // namespace blink 257 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698