Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(327)

Side by Side Diff: Source/modules/fetch/CompositeDataConsumerHandle.cpp

Issue 1162043007: Introduce CompositeDataConsumerHandle. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "config.h"
6 #include "modules/fetch/CompositeDataConsumerHandle.h"
7
8 #include "platform/Task.h"
9 #include "platform/ThreadSafeFunctional.h"
10 #include "public/platform/Platform.h"
11 #include "public/platform/WebThread.h"
12 #include "public/platform/WebTraceLocation.h"
13 #include "wtf/Functional.h"
14 #include "wtf/Locker.h"
15 #include "wtf/ThreadSafeRefCounted.h"
16 #include "wtf/ThreadingPrimitives.h"
17
18 namespace blink {
19
20 using Result = WebDataConsumerHandle::Result;
21
22 namespace {
23
24 class WaitingHandle final : public WebDataConsumerHandle {
25 private:
26 class ReaderImpl final : public WebDataConsumerHandle::Reader {
27 public:
28 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz e) override
29 {
30 *readSize = 0;
31 return ShouldWait;
32 }
33 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size _t *available) override
34 {
35 *available = 0;
36 *buffer = nullptr;
37 return ShouldWait;
38 }
39 Result endRead(size_t) override
40 {
41 return UnexpectedError;
42 }
43 };
44 Reader* obtainReaderInternal(Client*) override { return new ReaderImpl; }
45 };
46
47 class DoneHandle final : public WebDataConsumerHandle {
48 private:
49 class ReaderImpl final : public WebDataConsumerHandle::Reader {
50 public:
51 explicit ReaderImpl(Client* client) : m_factory(this)
52 {
53 if (!client)
54 return;
55 // Note we don't need thread safety here because this object is
56 // bound to the current thread.
57 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(b ind(&ReaderImpl::notify, m_factory.createWeakPtr(), client)));
58 }
59 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz e) override
60 {
61 *readSize = 0;
62 return Done;
63 }
64 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size _t *available) override
65 {
66 *available = 0;
67 *buffer = nullptr;
68 return Done;
69 }
70 Result endRead(size_t) override
71 {
72 return UnexpectedError;
73 }
74
75 private:
76 void notify(Client* client)
77 {
78 client->didGetReadable();
79 }
80
81 WeakPtrFactory<ReaderImpl> m_factory;
82 };
83
84 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp l(client); }
85 };
86
87 } // namespace
88
89 class CompositeDataConsumerHandle::ReaderImpl final : public WebDataConsumerHand le::Reader {
90 public:
91 explicit ReaderImpl(PassRefPtr<Context>);
92 ~ReaderImpl() override;
93 Result read(void* data, size_t /* size */, Flags, size_t* readSize) override ;
94 Result beginRead(const void** buffer, Flags, size_t* available) override;
95 Result endRead(size_t readSize) override;
96
97 private:
98 RefPtr<Context> m_context;
99 };
100
101 class CompositeDataConsumerHandle::Context final : public ThreadSafeRefCounted<C ontext> {
102 public:
103 using Token = unsigned;
104 static PassRefPtr<Context> create(PassOwnPtr<WebDataConsumerHandle> handle) { return adoptRef(new Context(handle)); }
105 ~Context()
106 {
107 ASSERT(!m_readerThread);
108 ASSERT(!m_reader);
109 ASSERT(!m_client);
110 }
111 PassOwnPtr<ReaderImpl> obtainReader(Client* client)
112 {
113 MutexLocker locker(m_mutex);
114 ASSERT(!m_readerThread);
115 ASSERT(!m_reader);
116 ASSERT(!m_client);
117 ++m_token;
118 m_client = client;
119 m_readerThread = Platform::current()->currentThread();
120 m_reader = m_handle->obtainReader(m_client);
121 return adoptPtr(new ReaderImpl(this));
122 }
123 void detachReader()
124 {
125 MutexLocker locker(m_mutex);
126 ASSERT(m_readerThread);
127 ASSERT(m_readerThread->isCurrentThread());
128 ASSERT(m_reader);
129 ASSERT(!m_isInTwoPhaseRead);
130 ASSERT(!m_isUpdateWaitingForEndRead);
131 ++m_token;
132 m_reader = nullptr;
133 m_readerThread = nullptr;
134 m_client = nullptr;
135 }
136 void update(PassOwnPtr<WebDataConsumerHandle> handle)
137 {
138 MutexLocker locker(m_mutex);
139 m_handle = handle;
140 if (!m_readerThread) {
141 // There is no reader.
142 return;
143 }
144 ++m_token;
145 updateReaderNoLock(m_token);
146 }
147
148 Result read(void* data, size_t size, Flags flags, size_t* readSize)
149 {
150 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
151 return m_reader->read(data, size, flags, readSize);
152 }
153 Result beginRead(const void** buffer, Flags flags, size_t* available)
154 {
155 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
156 ASSERT(!m_isInTwoPhaseRead);
157 m_isInTwoPhaseRead = true;
hiroshige 2015/06/10 13:39:52 We shouldn't set m_isInTwoPhaseRead if m_reader->b
yhirano 2015/06/11 08:43:37 Done.
158 return m_reader->beginRead(buffer, flags, available);
159 }
160 Result endRead(size_t readSize)
161 {
162 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
163 ASSERT(m_isInTwoPhaseRead);
164 Result r = m_reader->endRead(readSize);
165 m_isInTwoPhaseRead = false;
166 if (m_isUpdateWaitingForEndRead) {
167 // We need this lock to access |m_handle|.
168 MutexLocker locker(m_mutex);
169 m_reader = nullptr;
170 m_reader = m_handle->obtainReader(m_client);
171 m_isUpdateWaitingForEndRead = false;
172 }
173 return r;
174 }
175
176 private:
177 explicit Context(PassOwnPtr<WebDataConsumerHandle> handle)
178 : m_handle(handle)
179 , m_readerThread(nullptr)
180 , m_client(nullptr)
181 , m_token(0)
182 , m_isUpdateWaitingForEndRead(false)
183 , m_isInTwoPhaseRead(false)
184 {
185 }
186 void updateReader(Token token)
187 {
188 MutexLocker locker(m_mutex);
189 updateReaderNoLock(token);
190 }
191 void updateReaderNoLock(Token token)
192 {
193 if (token != m_token) {
194 // This request is not fresh. Ignore it.
195 return;
196 }
197 ASSERT(m_readerThread);
198 ASSERT(m_reader);
199 if (m_readerThread->isCurrentThread()) {
200 if (m_isInTwoPhaseRead) {
201 // We are waiting for the two-phase read completion.
202 m_isUpdateWaitingForEndRead = true;
203 return;
204 }
205 // Unregister the old one, then register the new one.
206 m_reader = nullptr;
207 m_reader = m_handle->obtainReader(m_client);
208 return;
209 }
210 ++m_token;
211 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::up dateReader, this, m_token)));
212 }
213
214 OwnPtr<Reader> m_reader;
215 OwnPtr<WebDataConsumerHandle> m_handle;
216 // Note: Holding a WebThread raw pointer is not generally safe, but we can
217 // do that in this case because:
218 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
219 // responsibility.
220 // 2. |m_readerThread| will never be used after the associated reader is
221 // detached.
222 WebThread* m_readerThread;
223 Client* m_client;
224 Token m_token;
225 // These boolean values are bound to the reader thread.
226 bool m_isUpdateWaitingForEndRead;
227 bool m_isInTwoPhaseRead;
228 Mutex m_mutex;
229 };
230
231 CompositeDataConsumerHandle::ReaderImpl::ReaderImpl(PassRefPtr<Context> context) : m_context(context) { }
232
233 CompositeDataConsumerHandle::ReaderImpl::~ReaderImpl()
234 {
235 m_context->detachReader();
236 }
237
238 Result CompositeDataConsumerHandle::ReaderImpl::read(void* data, size_t size, Fl ags flags, size_t* readSize)
239 {
240 return m_context->read(data, size, flags, readSize);
241 }
242
243 Result CompositeDataConsumerHandle::ReaderImpl::beginRead(const void** buffer, F lags flags, size_t* available)
244 {
245 return m_context->beginRead(buffer, flags, available);
246 }
247
248 Result CompositeDataConsumerHandle::ReaderImpl::endRead(size_t readSize)
249 {
250 return m_context->endRead(readSize);
251 }
252
253 CompositeDataConsumerHandle::CompositeDataConsumerHandle(PassOwnPtr<WebDataConsu merHandle> handle)
254 : m_context(Context::create(handle)) { }
255
256 CompositeDataConsumerHandle::~CompositeDataConsumerHandle() { }
257
258 WebDataConsumerHandle::Reader* CompositeDataConsumerHandle::obtainReaderInternal (Client* client)
259 {
260 return m_context->obtainReader(client).leakPtr();
261 }
262
263 void CompositeDataConsumerHandle::update(PassOwnPtr<WebDataConsumerHandle> handl e)
264 {
265 ASSERT(handle);
266 m_context->update(handle);
267 }
268
269 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createWaitingHand le()
270 {
271 return adoptPtr(new WaitingHandle);
272 }
273
274 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createDoneHandle( )
275 {
276 return adoptPtr(new DoneHandle);
277 }
278
279 } // namespace blink
OLDNEW
« no previous file with comments | « Source/modules/fetch/CompositeDataConsumerHandle.h ('k') | Source/modules/fetch/CompositeDataConsumerHandleTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698