Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: Source/modules/fetch/DataConsumerHandleUtil.cpp

Issue 1176243004: Add FetchDataConsumerHandle and utility functions/classes (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "modules/fetch/CompositeDataConsumerHandle.h" 6 #include "modules/fetch/DataConsumerHandleUtil.h"
7 7
8 #include "platform/Task.h" 8 #include "platform/Task.h"
9 #include "platform/ThreadSafeFunctional.h" 9 #include "platform/blob/BlobData.h"
10 #include "public/platform/Platform.h" 10 #include "public/platform/Platform.h"
11 #include "public/platform/WebThread.h" 11 #include "public/platform/WebThread.h"
12 #include "public/platform/WebTraceLocation.h" 12 #include "public/platform/WebTraceLocation.h"
13 #include "wtf/Functional.h" 13 #include "wtf/Functional.h"
14 #include "wtf/Locker.h"
15 #include "wtf/ThreadSafeRefCounted.h"
16 #include "wtf/ThreadingPrimitives.h"
17 14
18 namespace blink { 15 namespace blink {
19 16
20 using Result = WebDataConsumerHandle::Result; 17 using Result = WebDataConsumerHandle::Result;
21 18
22 namespace { 19 namespace {
23 20
24 class WaitingHandle final : public WebDataConsumerHandle { 21 class WaitingHandle final : public WebDataConsumerHandle {
25 private: 22 private:
26 class ReaderImpl final : public WebDataConsumerHandle::Reader { 23 class ReaderImpl final : public WebDataConsumerHandle::Reader {
(...skipping 10 matching lines...) Expand all
37 return ShouldWait; 34 return ShouldWait;
38 } 35 }
39 Result endRead(size_t) override 36 Result endRead(size_t) override
40 { 37 {
41 return UnexpectedError; 38 return UnexpectedError;
42 } 39 }
43 }; 40 };
44 Reader* obtainReaderInternal(Client*) override { return new ReaderImpl; } 41 Reader* obtainReaderInternal(Client*) override { return new ReaderImpl; }
45 }; 42 };
46 43
47 class DoneHandle final : public WebDataConsumerHandle { 44 class DoneHandle final : public WebDataConsumerHandle {
yhirano 2015/06/17 04:13:34 [opt] You can unify DoneHandle and UnexpectedError
hiroshige 2015/06/17 08:30:23 Done.
48 private: 45 private:
49 class ReaderImpl final : public WebDataConsumerHandle::Reader { 46 class ReaderImpl final : public WebDataConsumerHandle::Reader, public Notify OnReaderCreationHelper {
50 public: 47 public:
51 explicit ReaderImpl(Client* client) : m_factory(this) 48 explicit ReaderImpl(Client* client) : NotifyOnReaderCreationHelper(clien t) { }
52 {
53 if (!client)
54 return;
55 // Note we don't need thread safety here because this object is
56 // bound to the current thread.
57 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(b ind(&ReaderImpl::notify, m_factory.createWeakPtr(), client)));
58 }
59 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz e) override 49 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz e) override
60 { 50 {
61 *readSize = 0; 51 *readSize = 0;
62 return Done; 52 return Done;
63 } 53 }
64 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size _t *available) override 54 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size _t *available) override
65 { 55 {
66 *available = 0; 56 *available = 0;
67 *buffer = nullptr; 57 *buffer = nullptr;
68 return Done; 58 return Done;
69 } 59 }
70 Result endRead(size_t) override 60 Result endRead(size_t) override
71 { 61 {
72 return UnexpectedError; 62 return UnexpectedError;
73 } 63 }
74
75 private:
76 void notify(Client* client)
77 {
78 client->didGetReadable();
79 }
80
81 WeakPtrFactory<ReaderImpl> m_factory;
82 }; 64 };
83 65
84 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp l(client); } 66 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp l(client); }
85 }; 67 };
86 68
69 class UnexpectedErrorHandle final : public WebDataConsumerHandle {
70 private:
71 class ReaderImpl final : public WebDataConsumerHandle::Reader, public Notify OnReaderCreationHelper {
72 public:
73 explicit ReaderImpl(Client* client) : NotifyOnReaderCreationHelper(clien t) { }
74 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz e) override
75 {
76 *readSize = 0;
77 return UnexpectedError;
78 }
79 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size _t *available) override
80 {
81 *available = 0;
82 *buffer = nullptr;
83 return UnexpectedError;
84 }
85 Result endRead(size_t) override
86 {
87 return UnexpectedError;
88 }
89 };
90
91 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp l(client); }
92 };
93
94 class WebToFetchDataConsumerHandleAdapter : public FetchDataConsumerHandle {
95 public:
96 WebToFetchDataConsumerHandleAdapter(PassOwnPtr<WebDataConsumerHandle> handle ) : m_handle(handle) { }
97 private:
98 class ReaderImpl final : public FetchDataConsumerHandle::Reader {
99 public:
100 ReaderImpl(PassOwnPtr<WebDataConsumerHandle::Reader> reader) : m_reader( reader) { }
101 Result read(void* data, size_t size, Flags flags, size_t* readSize) over ride
102 {
103 return m_reader->read(data, size, flags, readSize);
104 }
105
106 Result beginRead(const void** buffer, Flags flags, size_t* available) ov erride
107 {
108 return m_reader->beginRead(buffer, flags, available);
109 }
110 Result endRead(size_t readSize) override
111 {
112 return m_reader->endRead(readSize);
113 }
114 PassRefPtr<BlobDataHandle> blobDataHandle() override
115 {
116 return nullptr;
117 }
118 private:
119 OwnPtr<WebDataConsumerHandle::Reader> m_reader;
120 };
121
122 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp l(m_handle->obtainReader(client)); }
123
124 OwnPtr<WebDataConsumerHandle> m_handle;
125 };
126
87 } // namespace 127 } // namespace
88 128
89 class CompositeDataConsumerHandle::ReaderImpl final : public WebDataConsumerHand le::Reader { 129 PassOwnPtr<WebDataConsumerHandle> createWaitingHandle()
90 public:
91 explicit ReaderImpl(PassRefPtr<Context>);
92 ~ReaderImpl() override;
93 Result read(void* data, size_t /* size */, Flags, size_t* readSize) override ;
94 Result beginRead(const void** buffer, Flags, size_t* available) override;
95 Result endRead(size_t readSize) override;
96
97 private:
98 RefPtr<Context> m_context;
99 };
100
101 class CompositeDataConsumerHandle::Context final : public ThreadSafeRefCounted<C ontext> {
102 public:
103 using Token = unsigned;
104 static PassRefPtr<Context> create(PassOwnPtr<WebDataConsumerHandle> handle) { return adoptRef(new Context(handle)); }
105 ~Context()
106 {
107 ASSERT(!m_readerThread);
108 ASSERT(!m_reader);
109 ASSERT(!m_client);
110 }
111 PassOwnPtr<ReaderImpl> obtainReader(Client* client)
112 {
113 MutexLocker locker(m_mutex);
114 ASSERT(!m_readerThread);
115 ASSERT(!m_reader);
116 ASSERT(!m_client);
117 ++m_token;
118 m_client = client;
119 m_readerThread = Platform::current()->currentThread();
120 m_reader = m_handle->obtainReader(m_client);
121 return adoptPtr(new ReaderImpl(this));
122 }
123 void detachReader()
124 {
125 MutexLocker locker(m_mutex);
126 ASSERT(m_readerThread);
127 ASSERT(m_readerThread->isCurrentThread());
128 ASSERT(m_reader);
129 ASSERT(!m_isInTwoPhaseRead);
130 ASSERT(!m_isUpdateWaitingForEndRead);
131 ++m_token;
132 m_reader = nullptr;
133 m_readerThread = nullptr;
134 m_client = nullptr;
135 }
136 void update(PassOwnPtr<WebDataConsumerHandle> handle)
137 {
138 MutexLocker locker(m_mutex);
139 m_handle = handle;
140 if (!m_readerThread) {
141 // There is no reader.
142 return;
143 }
144 ++m_token;
145 updateReaderNoLock(m_token);
146 }
147
148 Result read(void* data, size_t size, Flags flags, size_t* readSize)
149 {
150 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
151 return m_reader->read(data, size, flags, readSize);
152 }
153 Result beginRead(const void** buffer, Flags flags, size_t* available)
154 {
155 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
156 ASSERT(!m_isInTwoPhaseRead);
157 Result r = m_reader->beginRead(buffer, flags, available);
158 m_isInTwoPhaseRead = (r == Ok);
159 return r;
160 }
161 Result endRead(size_t readSize)
162 {
163 ASSERT(m_readerThread && m_readerThread->isCurrentThread());
164 ASSERT(m_isInTwoPhaseRead);
165 Result r = m_reader->endRead(readSize);
166 m_isInTwoPhaseRead = false;
167 if (m_isUpdateWaitingForEndRead) {
168 // We need this lock to access |m_handle|.
169 MutexLocker locker(m_mutex);
170 m_reader = nullptr;
171 m_reader = m_handle->obtainReader(m_client);
172 m_isUpdateWaitingForEndRead = false;
173 }
174 return r;
175 }
176
177 private:
178 explicit Context(PassOwnPtr<WebDataConsumerHandle> handle)
179 : m_handle(handle)
180 , m_readerThread(nullptr)
181 , m_client(nullptr)
182 , m_token(0)
183 , m_isUpdateWaitingForEndRead(false)
184 , m_isInTwoPhaseRead(false)
185 {
186 }
187 void updateReader(Token token)
188 {
189 MutexLocker locker(m_mutex);
190 updateReaderNoLock(token);
191 }
192 void updateReaderNoLock(Token token)
193 {
194 if (token != m_token) {
195 // This request is not fresh. Ignore it.
196 return;
197 }
198 ASSERT(m_readerThread);
199 ASSERT(m_reader);
200 if (m_readerThread->isCurrentThread()) {
201 if (m_isInTwoPhaseRead) {
202 // We are waiting for the two-phase read completion.
203 m_isUpdateWaitingForEndRead = true;
204 return;
205 }
206 // Unregister the old one, then register the new one.
207 m_reader = nullptr;
208 m_reader = m_handle->obtainReader(m_client);
209 return;
210 }
211 ++m_token;
212 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::up dateReader, this, m_token)));
213 }
214
215 OwnPtr<Reader> m_reader;
216 OwnPtr<WebDataConsumerHandle> m_handle;
217 // Note: Holding a WebThread raw pointer is not generally safe, but we can
218 // do that in this case because:
219 // 1. Destructing a ReaderImpl when the bound thread ends is a user's
220 // responsibility.
221 // 2. |m_readerThread| will never be used after the associated reader is
222 // detached.
223 WebThread* m_readerThread;
224 Client* m_client;
225 Token m_token;
226 // These boolean values are bound to the reader thread.
227 bool m_isUpdateWaitingForEndRead;
228 bool m_isInTwoPhaseRead;
229 Mutex m_mutex;
230 };
231
232 CompositeDataConsumerHandle::ReaderImpl::ReaderImpl(PassRefPtr<Context> context) : m_context(context) { }
233
234 CompositeDataConsumerHandle::ReaderImpl::~ReaderImpl()
235 {
236 m_context->detachReader();
237 }
238
239 Result CompositeDataConsumerHandle::ReaderImpl::read(void* data, size_t size, Fl ags flags, size_t* readSize)
240 {
241 return m_context->read(data, size, flags, readSize);
242 }
243
244 Result CompositeDataConsumerHandle::ReaderImpl::beginRead(const void** buffer, F lags flags, size_t* available)
245 {
246 return m_context->beginRead(buffer, flags, available);
247 }
248
249 Result CompositeDataConsumerHandle::ReaderImpl::endRead(size_t readSize)
250 {
251 return m_context->endRead(readSize);
252 }
253
254 CompositeDataConsumerHandle::CompositeDataConsumerHandle(PassOwnPtr<WebDataConsu merHandle> handle)
255 : m_context(Context::create(handle)) { }
256
257 CompositeDataConsumerHandle::~CompositeDataConsumerHandle() { }
258
259 WebDataConsumerHandle::Reader* CompositeDataConsumerHandle::obtainReaderInternal (Client* client)
260 {
261 return m_context->obtainReader(client).leakPtr();
262 }
263
264 void CompositeDataConsumerHandle::update(PassOwnPtr<WebDataConsumerHandle> handl e)
265 {
266 ASSERT(handle);
267 m_context->update(handle);
268 }
269
270 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createWaitingHand le()
271 { 130 {
272 return adoptPtr(new WaitingHandle); 131 return adoptPtr(new WaitingHandle);
273 } 132 }
274 133
275 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createDoneHandle( ) 134 PassOwnPtr<WebDataConsumerHandle> createDoneHandle()
276 { 135 {
277 return adoptPtr(new DoneHandle); 136 return adoptPtr(new DoneHandle);
278 } 137 }
279 138
139 PassOwnPtr<WebDataConsumerHandle> createUnexpectedErrorHandle()
140 {
141 return adoptPtr(new UnexpectedErrorHandle);
142 }
143
144 PassOwnPtr<FetchDataConsumerHandle> createFetchHandleFromWebHandle(PassOwnPtr<We bDataConsumerHandle> handle)
145 {
146 return adoptPtr(new WebToFetchDataConsumerHandleAdapter(handle));
147 }
148
149 NotifyOnReaderCreationHelper::NotifyOnReaderCreationHelper(WebDataConsumerHandle ::Client* client)
150 : m_factory(this)
151 {
152 if (!client)
153 return;
154 // Note we don't need thread safety here because this object is
155 // bound to the current thread.
156 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(bind(&Not ifyOnReaderCreationHelper::notify, m_factory.createWeakPtr(), client)));
157 }
158
159 void NotifyOnReaderCreationHelper::notify(WebDataConsumerHandle::Client* client)
160 {
161 client->didGetReadable();
162 }
163
280 } // namespace blink 164 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698