| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "config.h" |
| 6 #include "modules/fetch/CompositeDataConsumerHandle.h" |
| 7 |
| 8 #include "platform/Task.h" |
| 9 #include "platform/ThreadSafeFunctional.h" |
| 10 #include "public/platform/Platform.h" |
| 11 #include "public/platform/WebThread.h" |
| 12 #include "public/platform/WebTraceLocation.h" |
| 13 #include "wtf/Functional.h" |
| 14 #include "wtf/Locker.h" |
| 15 #include "wtf/ThreadSafeRefCounted.h" |
| 16 #include "wtf/ThreadingPrimitives.h" |
| 17 |
| 18 namespace blink { |
| 19 |
| 20 using Result = WebDataConsumerHandle::Result; |
| 21 |
| 22 namespace { |
| 23 |
| 24 class WaitingHandle final : public WebDataConsumerHandle { |
| 25 private: |
| 26 class ReaderImpl final : public WebDataConsumerHandle::Reader { |
| 27 public: |
| 28 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz
e) override |
| 29 { |
| 30 *readSize = 0; |
| 31 return ShouldWait; |
| 32 } |
| 33 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size
_t *available) override |
| 34 { |
| 35 *available = 0; |
| 36 *buffer = nullptr; |
| 37 return ShouldWait; |
| 38 } |
| 39 Result endRead(size_t) override |
| 40 { |
| 41 return UnexpectedError; |
| 42 } |
| 43 }; |
| 44 Reader* obtainReaderInternal(Client*) override { return new ReaderImpl; } |
| 45 }; |
| 46 |
| 47 class DoneHandle final : public WebDataConsumerHandle { |
| 48 private: |
| 49 class ReaderImpl final : public WebDataConsumerHandle::Reader { |
| 50 public: |
| 51 explicit ReaderImpl(Client* client) : m_factory(this) |
| 52 { |
| 53 if (!client) |
| 54 return; |
| 55 // Note we don't need thread safety here because this object is |
| 56 // bound to the current thread. |
| 57 Platform::current()->currentThread()->postTask(FROM_HERE, new Task(b
ind(&ReaderImpl::notify, m_factory.createWeakPtr(), client))); |
| 58 } |
| 59 Result read(void*, size_t, WebDataConsumerHandle::Flags, size_t *readSiz
e) override |
| 60 { |
| 61 *readSize = 0; |
| 62 return Done; |
| 63 } |
| 64 Result beginRead(const void** buffer, WebDataConsumerHandle::Flags, size
_t *available) override |
| 65 { |
| 66 *available = 0; |
| 67 *buffer = nullptr; |
| 68 return Done; |
| 69 } |
| 70 Result endRead(size_t) override |
| 71 { |
| 72 return UnexpectedError; |
| 73 } |
| 74 |
| 75 private: |
| 76 void notify(Client* client) |
| 77 { |
| 78 client->didGetReadable(); |
| 79 } |
| 80 |
| 81 WeakPtrFactory<ReaderImpl> m_factory; |
| 82 }; |
| 83 |
| 84 Reader* obtainReaderInternal(Client* client) override { return new ReaderImp
l(client); } |
| 85 }; |
| 86 |
| 87 } // namespace |
| 88 |
| 89 class CompositeDataConsumerHandle::ReaderImpl final : public WebDataConsumerHand
le::Reader { |
| 90 public: |
| 91 explicit ReaderImpl(PassRefPtr<Context>); |
| 92 ~ReaderImpl() override; |
| 93 Result read(void* data, size_t /* size */, Flags, size_t* readSize) override
; |
| 94 Result beginRead(const void** buffer, Flags, size_t* available) override; |
| 95 Result endRead(size_t readSize) override; |
| 96 |
| 97 private: |
| 98 RefPtr<Context> m_context; |
| 99 }; |
| 100 |
| 101 class CompositeDataConsumerHandle::Context final : public ThreadSafeRefCounted<C
ontext> { |
| 102 public: |
| 103 using Token = unsigned; |
| 104 static PassRefPtr<Context> create(PassOwnPtr<WebDataConsumerHandle> handle)
{ return adoptRef(new Context(handle)); } |
| 105 ~Context() |
| 106 { |
| 107 ASSERT(!m_readerThread); |
| 108 ASSERT(!m_reader); |
| 109 ASSERT(!m_client); |
| 110 } |
| 111 PassOwnPtr<ReaderImpl> obtainReader(Client* client) |
| 112 { |
| 113 MutexLocker locker(m_mutex); |
| 114 ASSERT(!m_readerThread); |
| 115 ASSERT(!m_reader); |
| 116 ASSERT(!m_client); |
| 117 ++m_token; |
| 118 m_client = client; |
| 119 m_readerThread = Platform::current()->currentThread(); |
| 120 m_reader = m_handle->obtainReader(m_client); |
| 121 return adoptPtr(new ReaderImpl(this)); |
| 122 } |
| 123 void detachReader() |
| 124 { |
| 125 MutexLocker locker(m_mutex); |
| 126 ASSERT(m_readerThread); |
| 127 ASSERT(m_readerThread->isCurrentThread()); |
| 128 ASSERT(m_reader); |
| 129 ASSERT(!m_isInTwoPhaseRead); |
| 130 ASSERT(!m_isUpdateWaitingForEndRead); |
| 131 ++m_token; |
| 132 m_reader = nullptr; |
| 133 m_readerThread = nullptr; |
| 134 m_client = nullptr; |
| 135 } |
| 136 void update(PassOwnPtr<WebDataConsumerHandle> handle) |
| 137 { |
| 138 MutexLocker locker(m_mutex); |
| 139 m_handle = handle; |
| 140 if (!m_readerThread) { |
| 141 // There is no reader. |
| 142 return; |
| 143 } |
| 144 ++m_token; |
| 145 updateReaderNoLock(m_token); |
| 146 } |
| 147 |
| 148 Result read(void* data, size_t size, Flags flags, size_t* readSize) |
| 149 { |
| 150 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| 151 return m_reader->read(data, size, flags, readSize); |
| 152 } |
| 153 Result beginRead(const void** buffer, Flags flags, size_t* available) |
| 154 { |
| 155 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| 156 ASSERT(!m_isInTwoPhaseRead); |
| 157 Result r = m_reader->beginRead(buffer, flags, available); |
| 158 m_isInTwoPhaseRead = (r == Ok); |
| 159 return r; |
| 160 } |
| 161 Result endRead(size_t readSize) |
| 162 { |
| 163 ASSERT(m_readerThread && m_readerThread->isCurrentThread()); |
| 164 ASSERT(m_isInTwoPhaseRead); |
| 165 Result r = m_reader->endRead(readSize); |
| 166 m_isInTwoPhaseRead = false; |
| 167 if (m_isUpdateWaitingForEndRead) { |
| 168 // We need this lock to access |m_handle|. |
| 169 MutexLocker locker(m_mutex); |
| 170 m_reader = nullptr; |
| 171 m_reader = m_handle->obtainReader(m_client); |
| 172 m_isUpdateWaitingForEndRead = false; |
| 173 } |
| 174 return r; |
| 175 } |
| 176 |
| 177 private: |
| 178 explicit Context(PassOwnPtr<WebDataConsumerHandle> handle) |
| 179 : m_handle(handle) |
| 180 , m_readerThread(nullptr) |
| 181 , m_client(nullptr) |
| 182 , m_token(0) |
| 183 , m_isUpdateWaitingForEndRead(false) |
| 184 , m_isInTwoPhaseRead(false) |
| 185 { |
| 186 } |
| 187 void updateReader(Token token) |
| 188 { |
| 189 MutexLocker locker(m_mutex); |
| 190 updateReaderNoLock(token); |
| 191 } |
| 192 void updateReaderNoLock(Token token) |
| 193 { |
| 194 if (token != m_token) { |
| 195 // This request is not fresh. Ignore it. |
| 196 return; |
| 197 } |
| 198 ASSERT(m_readerThread); |
| 199 ASSERT(m_reader); |
| 200 if (m_readerThread->isCurrentThread()) { |
| 201 if (m_isInTwoPhaseRead) { |
| 202 // We are waiting for the two-phase read completion. |
| 203 m_isUpdateWaitingForEndRead = true; |
| 204 return; |
| 205 } |
| 206 // Unregister the old one, then register the new one. |
| 207 m_reader = nullptr; |
| 208 m_reader = m_handle->obtainReader(m_client); |
| 209 return; |
| 210 } |
| 211 ++m_token; |
| 212 m_readerThread->postTask(FROM_HERE, new Task(threadSafeBind(&Context::up
dateReader, this, m_token))); |
| 213 } |
| 214 |
| 215 OwnPtr<Reader> m_reader; |
| 216 OwnPtr<WebDataConsumerHandle> m_handle; |
| 217 // Note: Holding a WebThread raw pointer is not generally safe, but we can |
| 218 // do that in this case because: |
| 219 // 1. Destructing a ReaderImpl when the bound thread ends is a user's |
| 220 // responsibility. |
| 221 // 2. |m_readerThread| will never be used after the associated reader is |
| 222 // detached. |
| 223 WebThread* m_readerThread; |
| 224 Client* m_client; |
| 225 Token m_token; |
| 226 // These boolean values are bound to the reader thread. |
| 227 bool m_isUpdateWaitingForEndRead; |
| 228 bool m_isInTwoPhaseRead; |
| 229 Mutex m_mutex; |
| 230 }; |
| 231 |
| 232 CompositeDataConsumerHandle::ReaderImpl::ReaderImpl(PassRefPtr<Context> context)
: m_context(context) { } |
| 233 |
| 234 CompositeDataConsumerHandle::ReaderImpl::~ReaderImpl() |
| 235 { |
| 236 m_context->detachReader(); |
| 237 } |
| 238 |
| 239 Result CompositeDataConsumerHandle::ReaderImpl::read(void* data, size_t size, Fl
ags flags, size_t* readSize) |
| 240 { |
| 241 return m_context->read(data, size, flags, readSize); |
| 242 } |
| 243 |
| 244 Result CompositeDataConsumerHandle::ReaderImpl::beginRead(const void** buffer, F
lags flags, size_t* available) |
| 245 { |
| 246 return m_context->beginRead(buffer, flags, available); |
| 247 } |
| 248 |
| 249 Result CompositeDataConsumerHandle::ReaderImpl::endRead(size_t readSize) |
| 250 { |
| 251 return m_context->endRead(readSize); |
| 252 } |
| 253 |
| 254 CompositeDataConsumerHandle::CompositeDataConsumerHandle(PassOwnPtr<WebDataConsu
merHandle> handle) |
| 255 : m_context(Context::create(handle)) { } |
| 256 |
| 257 CompositeDataConsumerHandle::~CompositeDataConsumerHandle() { } |
| 258 |
| 259 WebDataConsumerHandle::Reader* CompositeDataConsumerHandle::obtainReaderInternal
(Client* client) |
| 260 { |
| 261 return m_context->obtainReader(client).leakPtr(); |
| 262 } |
| 263 |
| 264 void CompositeDataConsumerHandle::update(PassOwnPtr<WebDataConsumerHandle> handl
e) |
| 265 { |
| 266 ASSERT(handle); |
| 267 m_context->update(handle); |
| 268 } |
| 269 |
| 270 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createWaitingHand
le() |
| 271 { |
| 272 return adoptPtr(new WaitingHandle); |
| 273 } |
| 274 |
| 275 PassOwnPtr<WebDataConsumerHandle> CompositeDataConsumerHandle::createDoneHandle(
) |
| 276 { |
| 277 return adoptPtr(new DoneHandle); |
| 278 } |
| 279 |
| 280 } // namespace blink |
| OLD | NEW |