OLD | NEW |
(Empty) | |
| 1 /* MtCoder.c -- Multi-thread Coder |
| 2 2010-09-24 : Igor Pavlov : Public domain */ |
| 3 |
| 4 #include <stdio.h> |
| 5 |
| 6 #include "MtCoder.h" |
| 7 |
| 8 void LoopThread_Construct(CLoopThread *p) |
| 9 { |
| 10 Thread_Construct(&p->thread); |
| 11 Event_Construct(&p->startEvent); |
| 12 Event_Construct(&p->finishedEvent); |
| 13 } |
| 14 |
| 15 void LoopThread_Close(CLoopThread *p) |
| 16 { |
| 17 Thread_Close(&p->thread); |
| 18 Event_Close(&p->startEvent); |
| 19 Event_Close(&p->finishedEvent); |
| 20 } |
| 21 |
| 22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp) |
| 23 { |
| 24 CLoopThread *p = (CLoopThread *)pp; |
| 25 for (;;) |
| 26 { |
| 27 if (Event_Wait(&p->startEvent) != 0) |
| 28 return SZ_ERROR_THREAD; |
| 29 if (p->stop) |
| 30 return 0; |
| 31 p->res = p->func(p->param); |
| 32 if (Event_Set(&p->finishedEvent) != 0) |
| 33 return SZ_ERROR_THREAD; |
| 34 } |
| 35 } |
| 36 |
| 37 WRes LoopThread_Create(CLoopThread *p) |
| 38 { |
| 39 p->stop = 0; |
| 40 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent)); |
| 41 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent)); |
| 42 return Thread_Create(&p->thread, LoopThreadFunc, p); |
| 43 } |
| 44 |
| 45 WRes LoopThread_StopAndWait(CLoopThread *p) |
| 46 { |
| 47 p->stop = 1; |
| 48 if (Event_Set(&p->startEvent) != 0) |
| 49 return SZ_ERROR_THREAD; |
| 50 return Thread_Wait(&p->thread); |
| 51 } |
| 52 |
| 53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent
); } |
| 54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEv
ent); } |
| 55 |
| 56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize) |
| 57 { |
| 58 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : S
Z_OK; |
| 59 } |
| 60 |
| 61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress) |
| 62 { |
| 63 unsigned i; |
| 64 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) |
| 65 p->inSizes[i] = p->outSizes[i] = 0; |
| 66 p->totalInSize = p->totalOutSize = 0; |
| 67 p->progress = progress; |
| 68 p->res = SZ_OK; |
| 69 } |
| 70 |
| 71 static void MtProgress_Reinit(CMtProgress *p, unsigned index) |
| 72 { |
| 73 p->inSizes[index] = 0; |
| 74 p->outSizes[index] = 0; |
| 75 } |
| 76 |
| 77 #define UPDATE_PROGRESS(size, prev, total) \ |
| 78 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; } |
| 79 |
| 80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSiz
e) |
| 81 { |
| 82 SRes res; |
| 83 CriticalSection_Enter(&p->cs); |
| 84 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize) |
| 85 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize) |
| 86 if (p->res == SZ_OK) |
| 87 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize); |
| 88 res = p->res; |
| 89 CriticalSection_Leave(&p->cs); |
| 90 return res; |
| 91 } |
| 92 |
| 93 static void MtProgress_SetError(CMtProgress *p, SRes res) |
| 94 { |
| 95 CriticalSection_Enter(&p->cs); |
| 96 if (p->res == SZ_OK) |
| 97 p->res = res; |
| 98 CriticalSection_Leave(&p->cs); |
| 99 } |
| 100 |
| 101 static void MtCoder_SetError(CMtCoder* p, SRes res) |
| 102 { |
| 103 CriticalSection_Enter(&p->cs); |
| 104 if (p->res == SZ_OK) |
| 105 p->res = res; |
| 106 CriticalSection_Leave(&p->cs); |
| 107 } |
| 108 |
| 109 /* ---------- MtThread ---------- */ |
| 110 |
| 111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder) |
| 112 { |
| 113 p->mtCoder = mtCoder; |
| 114 p->outBuf = 0; |
| 115 p->inBuf = 0; |
| 116 Event_Construct(&p->canRead); |
| 117 Event_Construct(&p->canWrite); |
| 118 LoopThread_Construct(&p->thread); |
| 119 } |
| 120 |
| 121 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; } |
| 122 |
| 123 static void CMtThread_CloseEvents(CMtThread *p) |
| 124 { |
| 125 Event_Close(&p->canRead); |
| 126 Event_Close(&p->canWrite); |
| 127 } |
| 128 |
| 129 static void CMtThread_Destruct(CMtThread *p) |
| 130 { |
| 131 CMtThread_CloseEvents(p); |
| 132 |
| 133 if (Thread_WasCreated(&p->thread.thread)) |
| 134 { |
| 135 LoopThread_StopAndWait(&p->thread); |
| 136 LoopThread_Close(&p->thread); |
| 137 } |
| 138 |
| 139 if (p->mtCoder->alloc) |
| 140 IAlloc_Free(p->mtCoder->alloc, p->outBuf); |
| 141 p->outBuf = 0; |
| 142 |
| 143 if (p->mtCoder->alloc) |
| 144 IAlloc_Free(p->mtCoder->alloc, p->inBuf); |
| 145 p->inBuf = 0; |
| 146 } |
| 147 |
| 148 #define MY_BUF_ALLOC(buf, size, newSize) \ |
| 149 if (buf == 0 || size != newSize) \ |
| 150 { IAlloc_Free(p->mtCoder->alloc, buf); \ |
| 151 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \ |
| 152 if (buf == 0) return SZ_ERROR_MEM; } |
| 153 |
| 154 static SRes CMtThread_Prepare(CMtThread *p) |
| 155 { |
| 156 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize) |
| 157 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize) |
| 158 |
| 159 p->stopReading = False; |
| 160 p->stopWriting = False; |
| 161 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead)); |
| 162 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite)); |
| 163 |
| 164 return SZ_OK; |
| 165 } |
| 166 |
| 167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) |
| 168 { |
| 169 size_t size = *processedSize; |
| 170 *processedSize = 0; |
| 171 while (size != 0) |
| 172 { |
| 173 size_t curSize = size; |
| 174 SRes res = stream->Read(stream, data, &curSize); |
| 175 *processedSize += curSize; |
| 176 data += curSize; |
| 177 size -= curSize; |
| 178 RINOK(res); |
| 179 if (curSize == 0) |
| 180 return SZ_OK; |
| 181 } |
| 182 return SZ_OK; |
| 183 } |
| 184 |
| 185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThrea
ds - 1 ? 0 : p->index + 1] |
| 186 |
| 187 static SRes MtThread_Process(CMtThread *p, Bool *stop) |
| 188 { |
| 189 CMtThread *next; |
| 190 *stop = True; |
| 191 if (Event_Wait(&p->canRead) != 0) |
| 192 return SZ_ERROR_THREAD; |
| 193 |
| 194 next = GET_NEXT_THREAD(p); |
| 195 |
| 196 if (p->stopReading) |
| 197 { |
| 198 next->stopReading = True; |
| 199 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD; |
| 200 } |
| 201 |
| 202 { |
| 203 size_t size = p->mtCoder->blockSize; |
| 204 size_t destSize = p->outBufSize; |
| 205 |
| 206 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size)); |
| 207 next->stopReading = *stop = (size != p->mtCoder->blockSize); |
| 208 if (Event_Set(&next->canRead) != 0) |
| 209 return SZ_ERROR_THREAD; |
| 210 |
| 211 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index, |
| 212 p->outBuf, &destSize, p->inBuf, size, *stop)); |
| 213 |
| 214 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index); |
| 215 |
| 216 if (Event_Wait(&p->canWrite) != 0) |
| 217 return SZ_ERROR_THREAD; |
| 218 if (p->stopWriting) |
| 219 return SZ_ERROR_FAIL; |
| 220 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize)
!= destSize) |
| 221 return SZ_ERROR_WRITE; |
| 222 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD; |
| 223 } |
| 224 } |
| 225 |
| 226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) |
| 227 { |
| 228 CMtThread *p = (CMtThread *)pp; |
| 229 for (;;) |
| 230 { |
| 231 Bool stop; |
| 232 CMtThread *next = GET_NEXT_THREAD(p); |
| 233 SRes res = MtThread_Process(p, &stop); |
| 234 if (res != SZ_OK) |
| 235 { |
| 236 MtCoder_SetError(p->mtCoder, res); |
| 237 MtProgress_SetError(&p->mtCoder->mtProgress, res); |
| 238 next->stopReading = True; |
| 239 next->stopWriting = True; |
| 240 Event_Set(&next->canRead); |
| 241 Event_Set(&next->canWrite); |
| 242 return res; |
| 243 } |
| 244 if (stop) |
| 245 return 0; |
| 246 } |
| 247 } |
| 248 |
| 249 void MtCoder_Construct(CMtCoder* p) |
| 250 { |
| 251 unsigned i; |
| 252 p->alloc = 0; |
| 253 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) |
| 254 { |
| 255 CMtThread *t = &p->threads[i]; |
| 256 t->index = i; |
| 257 CMtThread_Construct(t, p); |
| 258 } |
| 259 CriticalSection_Init(&p->cs); |
| 260 CriticalSection_Init(&p->mtProgress.cs); |
| 261 } |
| 262 |
| 263 void MtCoder_Destruct(CMtCoder* p) |
| 264 { |
| 265 unsigned i; |
| 266 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++) |
| 267 CMtThread_Destruct(&p->threads[i]); |
| 268 CriticalSection_Delete(&p->cs); |
| 269 CriticalSection_Delete(&p->mtProgress.cs); |
| 270 } |
| 271 |
| 272 SRes MtCoder_Code(CMtCoder *p) |
| 273 { |
| 274 unsigned i, numThreads = p->numThreads; |
| 275 SRes res = SZ_OK; |
| 276 p->res = SZ_OK; |
| 277 |
| 278 MtProgress_Init(&p->mtProgress, p->progress); |
| 279 |
| 280 for (i = 0; i < numThreads; i++) |
| 281 { |
| 282 RINOK(CMtThread_Prepare(&p->threads[i])); |
| 283 } |
| 284 |
| 285 for (i = 0; i < numThreads; i++) |
| 286 { |
| 287 CMtThread *t = &p->threads[i]; |
| 288 CLoopThread *lt = &t->thread; |
| 289 |
| 290 if (!Thread_WasCreated(<->thread)) |
| 291 { |
| 292 lt->func = ThreadFunc; |
| 293 lt->param = t; |
| 294 |
| 295 if (LoopThread_Create(lt) != SZ_OK) |
| 296 { |
| 297 res = SZ_ERROR_THREAD; |
| 298 break; |
| 299 } |
| 300 } |
| 301 } |
| 302 |
| 303 if (res == SZ_OK) |
| 304 { |
| 305 unsigned j; |
| 306 for (i = 0; i < numThreads; i++) |
| 307 { |
| 308 CMtThread *t = &p->threads[i]; |
| 309 if (LoopThread_StartSubThread(&t->thread) != SZ_OK) |
| 310 { |
| 311 res = SZ_ERROR_THREAD; |
| 312 p->threads[0].stopReading = True; |
| 313 break; |
| 314 } |
| 315 } |
| 316 |
| 317 Event_Set(&p->threads[0].canWrite); |
| 318 Event_Set(&p->threads[0].canRead); |
| 319 |
| 320 for (j = 0; j < i; j++) |
| 321 LoopThread_WaitSubThread(&p->threads[j].thread); |
| 322 } |
| 323 |
| 324 for (i = 0; i < numThreads; i++) |
| 325 CMtThread_CloseEvents(&p->threads[i]); |
| 326 return (res == SZ_OK) ? p->res : res; |
| 327 } |
OLD | NEW |