OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "modules/fetch/CompositeDataConsumerHandle.h" | |
6 | |
7 #include "platform/CrossThreadFunctional.h" | |
8 #include "public/platform/Platform.h" | |
9 #include "public/platform/WebTaskRunner.h" | |
10 #include "public/platform/WebThread.h" | |
11 #include "public/platform/WebTraceLocation.h" | |
12 #include "wtf/Locker.h" | |
13 #include "wtf/PtrUtil.h" | |
14 #include "wtf/ThreadSafeRefCounted.h" | |
15 #include "wtf/ThreadingPrimitives.h" | |
16 #include <memory> | |
17 | |
18 namespace blink { | |
19 | |
20 using Result = WebDataConsumerHandle::Result; | |
21 | |
22 class CompositeDataConsumerHandle::ReaderImpl final : public WebDataConsumerHand
le::Reader { | |
23 public: | |
24 explicit ReaderImpl(PassRefPtr<Context>); | |
25 ~ReaderImpl() override; | |
26 Result read(void* data, size_t /* size */, Flags, size_t* readSize) override
; | |
27 Result beginRead(const void** buffer, Flags, size_t* available) override; | |
28 Result endRead(size_t readSize) override; | |
29 | |
30 private: | |
31 RefPtr<Context> m_context; | |
32 }; | |
33 | |
34 class CompositeDataConsumerHandle::Context final : public ThreadSafeRefCounted<C
ontext> { | |
35 public: | |
36 using Token = unsigned; | |
37 static PassRefPtr<Context> create(std::unique_ptr<WebDataConsumerHandle> han
dle) { return adoptRef(new Context(std::move(handle))); } | |
38 ~Context() | |
39 { | |
40 DCHECK(!m_readerThread); | |
41 DCHECK(!m_reader); | |
42 DCHECK(!m_client); | |
43 } | |
44 std::unique_ptr<ReaderImpl> obtainReader(Client* client) | |
45 { | |
46 MutexLocker locker(m_mutex); | |
47 DCHECK(!m_readerThread); | |
48 DCHECK(!m_reader); | |
49 DCHECK(!m_client); | |
50 ++m_token; | |
51 m_client = client; | |
52 m_readerThread = Platform::current()->currentThread(); | |
53 m_reader = m_handle->obtainReader(m_client); | |
54 return wrapUnique(new ReaderImpl(this)); | |
55 } | |
56 void detachReader() | |
57 { | |
58 MutexLocker locker(m_mutex); | |
59 DCHECK(m_readerThread); | |
60 DCHECK(m_readerThread->isCurrentThread()); | |
61 DCHECK(m_reader); | |
62 DCHECK(!m_isInTwoPhaseRead); | |
63 DCHECK(!m_isUpdateWaitingForEndRead); | |
64 ++m_token; | |
65 m_reader = nullptr; | |
66 m_readerThread = nullptr; | |
67 m_client = nullptr; | |
68 } | |
69 void update(std::unique_ptr<WebDataConsumerHandle> handle) | |
70 { | |
71 MutexLocker locker(m_mutex); | |
72 m_handle = std::move(handle); | |
73 if (!m_readerThread) { | |
74 // There is no reader. | |
75 return; | |
76 } | |
77 ++m_token; | |
78 updateReaderNoLock(m_token); | |
79 } | |
80 | |
81 Result read(void* data, size_t size, Flags flags, size_t* readSize) | |
82 { | |
83 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | |
84 return m_reader->read(data, size, flags, readSize); | |
85 } | |
86 Result beginRead(const void** buffer, Flags flags, size_t* available) | |
87 { | |
88 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | |
89 DCHECK(!m_isInTwoPhaseRead); | |
90 Result r = m_reader->beginRead(buffer, flags, available); | |
91 m_isInTwoPhaseRead = (r == Ok); | |
92 return r; | |
93 } | |
94 Result endRead(size_t readSize) | |
95 { | |
96 DCHECK(m_readerThread && m_readerThread->isCurrentThread()); | |
97 DCHECK(m_isInTwoPhaseRead); | |
98 Result r = m_reader->endRead(readSize); | |
99 m_isInTwoPhaseRead = false; | |
100 if (m_isUpdateWaitingForEndRead) { | |
101 // We need this lock to access |m_handle|. | |
102 MutexLocker locker(m_mutex); | |
103 m_reader = nullptr; | |
104 m_reader = m_handle->obtainReader(m_client); | |
105 m_isUpdateWaitingForEndRead = false; | |
106 } | |
107 return r; | |
108 } | |
109 | |
110 private: | |
111 explicit Context(std::unique_ptr<WebDataConsumerHandle> handle) | |
112 : m_handle(std::move(handle)) | |
113 , m_readerThread(nullptr) | |
114 , m_client(nullptr) | |
115 , m_token(0) | |
116 , m_isUpdateWaitingForEndRead(false) | |
117 , m_isInTwoPhaseRead(false) | |
118 { | |
119 } | |
120 void updateReader(Token token) | |
121 { | |
122 MutexLocker locker(m_mutex); | |
123 updateReaderNoLock(token); | |
124 } | |
125 void updateReaderNoLock(Token token) | |
126 { | |
127 if (token != m_token) { | |
128 // This request is not fresh. Ignore it. | |
129 return; | |
130 } | |
131 DCHECK(m_readerThread); | |
132 DCHECK(m_reader); | |
133 if (m_readerThread->isCurrentThread()) { | |
134 if (m_isInTwoPhaseRead) { | |
135 // We are waiting for the two-phase read completion. | |
136 m_isUpdateWaitingForEndRead = true; | |
137 return; | |
138 } | |
139 // Unregister the old one, then register the new one. | |
140 m_reader = nullptr; | |
141 m_reader = m_handle->obtainReader(m_client); | |
142 return; | |
143 } | |
144 ++m_token; | |
145 m_readerThread->getWebTaskRunner()->postTask(BLINK_FROM_HERE, crossThrea
dBind(&Context::updateReader, wrapPassRefPtr(this), m_token)); | |
146 } | |
147 | |
148 std::unique_ptr<Reader> m_reader; | |
149 std::unique_ptr<WebDataConsumerHandle> m_handle; | |
150 // Note: Holding a WebThread raw pointer is not generally safe, but we can | |
151 // do that in this case because: | |
152 // 1. Destructing a ReaderImpl when the bound thread ends is a user's | |
153 // responsibility. | |
154 // 2. |m_readerThread| will never be used after the associated reader is | |
155 // detached. | |
156 WebThread* m_readerThread; | |
157 Client* m_client; | |
158 Token m_token; | |
159 // These boolean values are bound to the reader thread. | |
160 bool m_isUpdateWaitingForEndRead; | |
161 bool m_isInTwoPhaseRead; | |
162 Mutex m_mutex; | |
163 }; | |
164 | |
165 CompositeDataConsumerHandle::ReaderImpl::ReaderImpl(PassRefPtr<Context> context)
: m_context(context) { } | |
166 | |
167 CompositeDataConsumerHandle::ReaderImpl::~ReaderImpl() | |
168 { | |
169 m_context->detachReader(); | |
170 } | |
171 | |
172 Result CompositeDataConsumerHandle::ReaderImpl::read(void* data, size_t size, Fl
ags flags, size_t* readSize) | |
173 { | |
174 return m_context->read(data, size, flags, readSize); | |
175 } | |
176 | |
177 Result CompositeDataConsumerHandle::ReaderImpl::beginRead(const void** buffer, F
lags flags, size_t* available) | |
178 { | |
179 return m_context->beginRead(buffer, flags, available); | |
180 } | |
181 | |
182 Result CompositeDataConsumerHandle::ReaderImpl::endRead(size_t readSize) | |
183 { | |
184 return m_context->endRead(readSize); | |
185 } | |
186 | |
187 CompositeDataConsumerHandle::Updater::Updater(PassRefPtr<Context> context) | |
188 : m_context(context) | |
189 #if DCHECK_IS_ON() | |
190 , m_thread(Platform::current()->currentThread()) | |
191 #endif | |
192 { | |
193 } | |
194 | |
195 CompositeDataConsumerHandle::Updater::~Updater() {} | |
196 | |
197 void CompositeDataConsumerHandle::Updater::update(std::unique_ptr<WebDataConsume
rHandle> handle) | |
198 { | |
199 DCHECK(handle); | |
200 #if DCHECK_IS_ON() | |
201 DCHECK(m_thread->isCurrentThread()); | |
202 #endif | |
203 m_context->update(std::move(handle)); | |
204 } | |
205 | |
206 CompositeDataConsumerHandle::CompositeDataConsumerHandle(std::unique_ptr<WebData
ConsumerHandle> handle, Updater** updater) | |
207 : m_context(Context::create(std::move(handle))) | |
208 { | |
209 *updater = new Updater(m_context); | |
210 } | |
211 | |
212 CompositeDataConsumerHandle::~CompositeDataConsumerHandle() { } | |
213 | |
214 std::unique_ptr<WebDataConsumerHandle::Reader> CompositeDataConsumerHandle::obta
inReader(Client* client) | |
215 { | |
216 return m_context->obtainReader(client); | |
217 } | |
218 | |
219 } // namespace blink | |
OLD | NEW |