Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(220)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp

Issue 2269953004: Implment BytesConsumer::tee (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/BytesConsumer.h" 5 #include "modules/fetch/BytesConsumer.h"
6 6
7 #include "core/dom/ExecutionContext.h"
8 #include "core/dom/TaskRunnerHelper.h"
9 #include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
11 #include "platform/blob/BlobData.h"
12 #include "public/platform/WebTaskRunner.h"
13 #include "wtf/Functional.h"
14 #include "wtf/RefPtr.h"
7 #include <algorithm> 15 #include <algorithm>
8 #include <string.h> 16 #include <string.h>
9 17
10 namespace blink { 18 namespace blink {
11 19
20 namespace {
21
22 class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public By tesConsumer::Client {
23 USING_GARBAGE_COLLECTED_MIXIN(NoopClient);
24 public:
25 void onStateChange() override {}
26 };
27
28 class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::C lient {
29 USING_GARBAGE_COLLECTED_MIXIN(Tee);
30 public:
31 Tee(ExecutionContext* executionContext, BytesConsumer* consumer)
32 : m_src(consumer)
33 , m_destination1(new Destination(executionContext, this))
34 , m_destination2(new Destination(executionContext, this))
35 {
36 consumer->setClient(this);
37 // As no client is set to either destinations, Destination::notify() is
38 // no-op in this function.
39 onStateChange();
40 }
41
42 void onStateChange() override
43 {
44 bool destination1WasEmpty = m_destination1->isEmpty();
45 bool destination2WasEmpty = m_destination2->isEmpty();
46 bool hasEnqueued = false;
47
48 while (true) {
49 const char* buffer = nullptr;
50 size_t available = 0;
51 Result result = m_src->beginRead(&buffer, &available);
52 if (result == Result::ShouldWait) {
53 if (hasEnqueued && destination1WasEmpty)
54 m_destination1->notify();
55 if (hasEnqueued && destination2WasEmpty)
56 m_destination2->notify();
57 return;
58 }
59 if (result == Result::Done) {
60 if (destination1WasEmpty)
61 m_destination1->notify();
62 if (destination2WasEmpty)
63 m_destination2->notify();
64 return;
65 }
66 if (result == Result::Error) {
67 clearAndNotify();
68 return;
69 }
70 DCHECK_EQ(Result::Ok, result);
hiroshige 2016/09/08 08:25:54 nit: how about using switch() and |case Result::Ok
yhirano 2016/09/08 09:41:34 Done.
71 Chunk* chunk = new Chunk(buffer, available);
72 if (m_src->endRead(available) != Result::Ok) {
73 clearAndNotify();
74 return;
75 }
76 m_destination1->enqueue(chunk);
77 m_destination2->enqueue(chunk);
78 hasEnqueued = true;
79 }
80 }
81
82 BytesConsumer::PublicState getPublicState() const
83 {
84 return m_src->getPublicState();
85 }
86
87 BytesConsumer::Error getError() const
88 {
89 return m_src->getError();
90 }
91
92 void cancel()
93 {
94 if (!m_destination1->isCancelled() || !m_destination2->isCancelled())
95 return;
96 m_src->cancel();
97 }
98
99 BytesConsumer* destination1() const { return m_destination1; }
100 BytesConsumer* destination2() const { return m_destination2; }
101
102 DEFINE_INLINE_TRACE()
103 {
104 visitor->trace(m_src);
105 visitor->trace(m_destination1);
106 visitor->trace(m_destination2);
107 BytesConsumer::Client::trace(visitor);
108 }
109
110 private:
111 using Result = BytesConsumer::Result;
112 class Chunk final : public GarbageCollectedFinalized<Chunk> {
113 public:
114 Chunk(const char* data, size_t size)
115 {
116 m_buffer.reserveCapacity(size);
hiroshige 2016/09/08 08:25:54 reserveInitialCapacity()? (I'm not a WTF::Vector s
yhirano 2016/09/08 09:41:34 Done.
117 m_buffer.append(data, size);
118 }
119 const char* data() const { return m_buffer.data(); }
120 size_t size() const { return m_buffer.size(); }
121
122 DEFINE_INLINE_TRACE() {}
123
124 private:
125 Vector<char> m_buffer;
126 };
127
128 class Destination final : public BytesConsumer {
129 public:
130 Destination(ExecutionContext* executionContext, Tee* tee)
131 : m_executionContext(executionContext)
132 , m_tee(tee)
133 {
134 }
135
136 Result beginRead(const char** buffer, size_t* available) override
137 {
138 DCHECK(!m_chunkInUse);
139 *buffer = nullptr;
140 *available = 0;
141 if (m_isCancelled || m_isClosed)
142 return Result::Done;
143 if (!m_chunks.isEmpty()) {
144 Chunk* chunk = m_chunks[0];
145 DCHECK_LE(m_offset, chunk->size());
146 *buffer = chunk->data() + m_offset;
147 *available = chunk->size() - m_offset;
148 m_chunkInUse = chunk;
149 return Result::Ok;
150 }
151 switch (m_tee->getPublicState()) {
152 case PublicState::ReadableOrWaiting:
153 return Result::ShouldWait;
154 case PublicState::Closed:
155 m_isClosed = true;
156 clearClient();
157 return Result::Done;
158 case PublicState::Errored:
159 clearClient();
160 return Result::Error;
161 }
162 NOTREACHED();
163 return Result::Error;
164 }
165
166 Result endRead(size_t read) override
167 {
168 DCHECK(m_chunkInUse);
169 DCHECK(m_chunks.isEmpty() || m_chunkInUse == m_chunks[0]);
170 m_chunkInUse = nullptr;
171 if (m_chunks.isEmpty()) {
172 // This object becomes errored during the two-phase read.
173 DCHECK_EQ(PublicState::Errored, getPublicState());
174 return Result::Ok;
175 }
176 Chunk* chunk = m_chunks[0];
177 DCHECK_LE(m_offset + read, chunk->size());
178 m_offset += read;
179 if (chunk->size() == m_offset) {
180 m_offset = 0;
181 m_chunks.removeFirst();
182 }
183 if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Cl osed) {
184 // All data has been consumed.
185 TaskRunnerHelper::get(TaskType::Networking, m_executionContext)- >postTask(BLINK_FROM_HERE, WTF::bind(&Destination::close, wrapPersistent(this))) ;
186 }
187 return Result::Ok;
188 }
189
190 void setClient(BytesConsumer::Client* client) override
191 {
192 DCHECK(!m_client);
193 DCHECK(client);
194 auto state = getPublicState();
195 if (state == PublicState::Closed || state == PublicState::Errored)
196 return;
197 m_client = client;
198 }
199
200 void clearClient() override
201 {
202 m_client = nullptr;
203 }
204
205 void cancel() override
206 {
207 DCHECK(!m_chunkInUse);
208 auto state = getPublicState();
209 if (state == PublicState::Closed || state == PublicState::Errored)
210 return;
211 m_isCancelled = true;
212 m_chunks.clear();
hiroshige 2016/09/08 08:25:54 How about calling clearChunks() to make |m_offset
yhirano 2016/09/08 09:41:34 Done.
213 clearClient();
214 m_tee->cancel();
215 }
216
217 PublicState getPublicState() const override
218 {
219 if (m_isCancelled || m_isClosed)
220 return PublicState::Closed;
221 auto state = m_tee->getPublicState();
222 // We don't say this object is closed unless m_isCancelled or
223 // m_isClosed is set.
224 return state == PublicState::Closed ? PublicState::ReadableOrWaiting : state;
225 }
226
227 Error getError() const override { return m_tee->getError(); }
228
229 String debugName() const override { return "Tee::Destination"; }
230
231 void enqueue(Chunk* chunk)
232 {
233 if (m_isCancelled)
234 return;
235 m_chunks.append(chunk);
236 }
237
238 bool isEmpty() const { return m_chunks.isEmpty(); }
239
240 void clearChunks()
241 {
242 m_chunks.clear();
243 m_offset = 0;
244 }
245
246 void notify()
247 {
248 if (m_isCancelled || m_isClosed)
249 return;
250 if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Cl osed) {
251 close();
252 return;
253 }
254 if (m_client) {
255 m_client->onStateChange();
256 if (getPublicState() == PublicState::Errored)
257 clearClient();
258 }
259 }
260
261 bool isCancelled() const { return m_isCancelled; }
262
263 DEFINE_INLINE_TRACE()
264 {
265 visitor->trace(m_executionContext);
266 visitor->trace(m_tee);
267 visitor->trace(m_client);
268 visitor->trace(m_chunks);
269 visitor->trace(m_chunkInUse);
270 BytesConsumer::trace(visitor);
271 }
272
273 private:
274 void close()
275 {
276 DCHECK_EQ(PublicState::Closed, m_tee->getPublicState());
277 DCHECK(m_chunks.isEmpty());
278 if (m_isClosed || m_isCancelled) {
279 // It's possible to reach here because this function can be
280 // called asynchronously.
281 return;
282 }
283 DCHECK_EQ(PublicState::ReadableOrWaiting, getPublicState());
284 m_isClosed = true;
285 if (m_client) {
286 m_client->onStateChange();
287 clearClient();
288 }
289 }
290
291 Member<ExecutionContext> m_executionContext;
292 Member<Tee> m_tee;
293 Member<BytesConsumer::Client> m_client;
294 HeapDeque<Member<Chunk>> m_chunks;
295 Member<Chunk> m_chunkInUse;
296 size_t m_offset = 0;
297 bool m_isCancelled = false;
298 bool m_isClosed = false;
299 };
300
301 void clearAndNotify()
302 {
303 m_destination1->clearChunks();
304 m_destination2->clearChunks();
305 m_destination1->notify();
306 m_destination2->notify();
307 }
308
309 Member<BytesConsumer> m_src;
310 Member<Destination> m_destination1;
311 Member<Destination> m_destination2;
312 };
313
314 } // namespace
315
12 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize) 316 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize)
13 { 317 {
14 *readSize = 0; 318 *readSize = 0;
15 const char* src = nullptr; 319 const char* src = nullptr;
16 size_t available; 320 size_t available;
17 Result r = beginRead(&src, &available); 321 Result r = beginRead(&src, &available);
18 if (r != Result::Ok) 322 if (r != Result::Ok)
19 return r; 323 return r;
20 *readSize = std::min(available, size); 324 *readSize = std::min(available, size);
21 memcpy(buffer, src, *readSize); 325 memcpy(buffer, src, *readSize);
22 return endRead(*readSize); 326 return endRead(*readSize);
23 } 327 }
24 328
329 void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2)
330 {
331 RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizeP olicy::AllowBlobWithInvalidSize);
332 if (blobDataHandle) {
333 // Register a client in order to be consistent.
334 src->setClient(new NoopClient);
335 // TODO(yhirano): Do not use FetchBlobDataConsumerHandle.
336 *dest1 = new BytesConsumerForDataConsumerHandle(executionContext, FetchB lobDataConsumerHandle::create(executionContext, blobDataHandle));
337 *dest2 = new BytesConsumerForDataConsumerHandle(executionContext, FetchB lobDataConsumerHandle::create(executionContext, blobDataHandle));
338 return;
339 }
340
341 Tee* tee = new Tee(executionContext, src);
342 *dest1 = tee->destination1();
343 *dest2 = tee->destination2();
344 }
345
25 } // namespace blink 346 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698