Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Side by Side Diff: fusl/src/aio/aio.c

Issue 1714623002: [fusl] clang-format fusl (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: headers too Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #include <aio.h> 1 #include <aio.h>
2 #include <pthread.h> 2 #include <pthread.h>
3 #include <semaphore.h> 3 #include <semaphore.h>
4 #include <limits.h> 4 #include <limits.h>
5 #include <errno.h> 5 #include <errno.h>
6 #include <unistd.h> 6 #include <unistd.h>
7 #include <stdlib.h> 7 #include <stdlib.h>
8 #include "syscall.h" 8 #include "syscall.h"
9 #include "atomic.h" 9 #include "atomic.h"
10 #include "libc.h" 10 #include "libc.h"
(...skipping 24 matching lines...) Expand all
35 * for sharing data between the main flow of execution and cancellation 35 * for sharing data between the main flow of execution and cancellation
36 * cleanup handler. 36 * cleanup handler.
37 * 37 *
38 * Taking any aio locks requires having all signals blocked. This is 38 * Taking any aio locks requires having all signals blocked. This is
39 * necessary because aio_cancel is needed by close, and close is required 39 * necessary because aio_cancel is needed by close, and close is required
40 * to be async-signal safe. All aio worker threads run with all signals 40 * to be async-signal safe. All aio worker threads run with all signals
41 * blocked permanently. 41 * blocked permanently.
42 */ 42 */
43 43
44 struct aio_args { 44 struct aio_args {
45 » struct aiocb *cb; 45 struct aiocb* cb;
46 » int op; 46 int op;
47 » int err; 47 int err;
48 » sem_t sem; 48 sem_t sem;
49 }; 49 };
50 50
51 struct aio_thread { 51 struct aio_thread {
52 » pthread_t td; 52 pthread_t td;
53 » struct aiocb *cb; 53 struct aiocb* cb;
54 » struct aio_thread *next, *prev; 54 struct aio_thread *next, *prev;
55 » struct aio_queue *q; 55 struct aio_queue* q;
56 » volatile int running; 56 volatile int running;
57 » int err, op; 57 int err, op;
58 » ssize_t ret; 58 ssize_t ret;
59 }; 59 };
60 60
61 struct aio_queue { 61 struct aio_queue {
62 » int fd, seekable, append, ref, init; 62 int fd, seekable, append, ref, init;
63 » pthread_mutex_t lock; 63 pthread_mutex_t lock;
64 » pthread_cond_t cond; 64 pthread_cond_t cond;
65 » struct aio_thread *head; 65 struct aio_thread* head;
66 }; 66 };
67 67
68 static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER; 68 static pthread_rwlock_t maplock = PTHREAD_RWLOCK_INITIALIZER;
69 static struct aio_queue *****map; 69 static struct aio_queue***** map;
70 static volatile int aio_fd_cnt; 70 static volatile int aio_fd_cnt;
71 volatile int __aio_fut; 71 volatile int __aio_fut;
72 72
73 static struct aio_queue *__aio_get_queue(int fd, int need) 73 static struct aio_queue* __aio_get_queue(int fd, int need) {
74 { 74 if (fd < 0)
75 » if (fd < 0) return 0; 75 return 0;
76 » int a=fd>>24; 76 int a = fd >> 24;
77 » unsigned char b=fd>>16, c=fd>>8, d=fd; 77 unsigned char b = fd >> 16, c = fd >> 8, d = fd;
78 » struct aio_queue *q = 0; 78 struct aio_queue* q = 0;
79 » pthread_rwlock_rdlock(&maplock); 79 pthread_rwlock_rdlock(&maplock);
80 » if ((!map || !map[a] || !map[a][b] || !map[a][b][c] || !(q=map[a][b][c][ d])) && need) { 80 if ((!map || !map[a] || !map[a][b] || !map[a][b][c] ||
81 » » pthread_rwlock_unlock(&maplock); 81 !(q = map[a][b][c][d])) &&
82 » » pthread_rwlock_wrlock(&maplock); 82 need) {
83 » » if (!map) map = calloc(sizeof *map, (-1U/2+1)>>24); 83 pthread_rwlock_unlock(&maplock);
84 » » if (!map) goto out; 84 pthread_rwlock_wrlock(&maplock);
85 » » if (!map[a]) map[a] = calloc(sizeof **map, 256); 85 if (!map)
86 » » if (!map[a]) goto out; 86 map = calloc(sizeof *map, (-1U / 2 + 1) >> 24);
87 » » if (!map[a][b]) map[a][b] = calloc(sizeof ***map, 256); 87 if (!map)
88 » » if (!map[a][b]) goto out; 88 goto out;
89 » » if (!map[a][b][c]) map[a][b][c] = calloc(sizeof ****map, 256); 89 if (!map[a])
90 » » if (!map[a][b][c]) goto out; 90 map[a] = calloc(sizeof **map, 256);
91 » » if (!(q = map[a][b][c][d])) { 91 if (!map[a])
92 » » » map[a][b][c][d] = q = calloc(sizeof *****map, 1); 92 goto out;
93 » » » if (q) { 93 if (!map[a][b])
94 » » » » q->fd = fd; 94 map[a][b] = calloc(sizeof ***map, 256);
95 » » » » pthread_mutex_init(&q->lock, 0); 95 if (!map[a][b])
96 » » » » pthread_cond_init(&q->cond, 0); 96 goto out;
97 » » » » a_inc(&aio_fd_cnt); 97 if (!map[a][b][c])
98 » » » } 98 map[a][b][c] = calloc(sizeof ****map, 256);
99 » » } 99 if (!map[a][b][c])
100 » } 100 goto out;
101 » if (q) pthread_mutex_lock(&q->lock); 101 if (!(q = map[a][b][c][d])) {
102 map[a][b][c][d] = q = calloc(sizeof *****map, 1);
103 if (q) {
104 q->fd = fd;
105 pthread_mutex_init(&q->lock, 0);
106 pthread_cond_init(&q->cond, 0);
107 a_inc(&aio_fd_cnt);
108 }
109 }
110 }
111 if (q)
112 pthread_mutex_lock(&q->lock);
102 out: 113 out:
103 pthread_rwlock_unlock(&maplock); 114 pthread_rwlock_unlock(&maplock);
104 return q; 115 return q;
105 } 116 }
106 117
107 static void __aio_unref_queue(struct aio_queue *q) 118 static void __aio_unref_queue(struct aio_queue* q) {
108 { 119 if (q->ref > 1) {
109 if (q->ref > 1) { 120 q->ref--;
110 q->ref--; 121 pthread_mutex_unlock(&q->lock);
111 pthread_mutex_unlock(&q->lock); 122 return;
112 return; 123 }
113 } 124
114 125 /* This is potentially the last reference, but a new reference
115 /* This is potentially the last reference, but a new reference 126 * may arrive since we cannot free the queue object without first
116 * may arrive since we cannot free the queue object without first 127 * taking the maplock, which requires releasing the queue lock. */
117 * taking the maplock, which requires releasing the queue lock. */ 128 pthread_mutex_unlock(&q->lock);
118 pthread_mutex_unlock(&q->lock); 129 pthread_rwlock_wrlock(&maplock);
119 pthread_rwlock_wrlock(&maplock); 130 pthread_mutex_lock(&q->lock);
120 pthread_mutex_lock(&q->lock); 131 if (q->ref == 1) {
121 if (q->ref == 1) { 132 int fd = q->fd;
122 int fd=q->fd; 133 int a = fd >> 24;
123 int a=fd>>24; 134 unsigned char b = fd >> 16, c = fd >> 8, d = fd;
124 unsigned char b=fd>>16, c=fd>>8, d=fd; 135 map[a][b][c][d] = 0;
125 map[a][b][c][d] = 0; 136 a_dec(&aio_fd_cnt);
126 a_dec(&aio_fd_cnt); 137 pthread_rwlock_unlock(&maplock);
127 pthread_rwlock_unlock(&maplock); 138 pthread_mutex_unlock(&q->lock);
128 pthread_mutex_unlock(&q->lock); 139 free(q);
129 free(q); 140 } else {
130 } else { 141 q->ref--;
131 q->ref--; 142 pthread_rwlock_unlock(&maplock);
132 pthread_rwlock_unlock(&maplock); 143 pthread_mutex_unlock(&q->lock);
133 pthread_mutex_unlock(&q->lock); 144 }
134 } 145 }
135 } 146
136 147 static void cleanup(void* ctx) {
137 static void cleanup(void *ctx) 148 struct aio_thread* at = ctx;
138 { 149 struct aio_queue* q = at->q;
139 struct aio_thread *at = ctx; 150 struct aiocb* cb = at->cb;
140 struct aio_queue *q = at->q; 151 struct sigevent sev = cb->aio_sigevent;
141 struct aiocb *cb = at->cb; 152
142 struct sigevent sev = cb->aio_sigevent; 153 /* There are four potential types of waiters we could need to wake:
143 154 * 1. Callers of aio_cancel/close.
144 /* There are four potential types of waiters we could need to wake: 155 * 2. Callers of aio_suspend with a single aiocb.
145 * 1. Callers of aio_cancel/close. 156 * 3. Callers of aio_suspend with a list.
146 * 2. Callers of aio_suspend with a single aiocb. 157 * 4. AIO worker threads waiting for sequenced operations.
147 * 3. Callers of aio_suspend with a list. 158 * Types 1-3 are notified via atomics/futexes, mainly for AS-safety
148 * 4. AIO worker threads waiting for sequenced operations. 159 * considerations. Type 4 is notified later via a cond var. */
149 * Types 1-3 are notified via atomics/futexes, mainly for AS-safety 160
150 * considerations. Type 4 is notified later via a cond var. */ 161 cb->__ret = at->ret;
151 162 if (a_swap(&at->running, 0) < 0)
152 cb->__ret = at->ret; 163 __wake(&at->running, -1, 1);
153 if (a_swap(&at->running, 0) < 0) 164 if (a_swap(&cb->__err, at->err) != EINPROGRESS)
154 __wake(&at->running, -1, 1); 165 __wake(&cb->__err, -1, 1);
155 if (a_swap(&cb->__err, at->err) != EINPROGRESS) 166 if (a_swap(&__aio_fut, 0))
156 __wake(&cb->__err, -1, 1); 167 __wake(&__aio_fut, -1, 1);
157 if (a_swap(&__aio_fut, 0)) 168
158 __wake(&__aio_fut, -1, 1); 169 pthread_mutex_lock(&q->lock);
159 170
160 pthread_mutex_lock(&q->lock); 171 if (at->next)
161 172 at->next->prev = at->prev;
162 if (at->next) at->next->prev = at->prev; 173 if (at->prev)
163 if (at->prev) at->prev->next = at->next; 174 at->prev->next = at->next;
164 else q->head = at->next; 175 else
165 176 q->head = at->next;
166 /* Signal aio worker threads waiting for sequenced operations. */ 177
167 pthread_cond_broadcast(&q->cond); 178 /* Signal aio worker threads waiting for sequenced operations. */
168 179 pthread_cond_broadcast(&q->cond);
169 __aio_unref_queue(q); 180
170 181 __aio_unref_queue(q);
171 if (sev.sigev_notify == SIGEV_SIGNAL) { 182
172 siginfo_t si = { 183 if (sev.sigev_notify == SIGEV_SIGNAL) {
173 .si_signo = sev.sigev_signo, 184 siginfo_t si = {.si_signo = sev.sigev_signo,
174 .si_value = sev.sigev_value, 185 .si_value = sev.sigev_value,
175 .si_code = SI_ASYNCIO, 186 .si_code = SI_ASYNCIO,
176 .si_pid = getpid(), 187 .si_pid = getpid(),
177 .si_uid = getuid() 188 .si_uid = getuid()};
178 }; 189 __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si);
179 __syscall(SYS_rt_sigqueueinfo, si.si_pid, si.si_signo, &si); 190 }
180 } 191 if (sev.sigev_notify == SIGEV_THREAD) {
181 if (sev.sigev_notify == SIGEV_THREAD) { 192 a_store(&__pthread_self()->cancel, 0);
182 a_store(&__pthread_self()->cancel, 0); 193 sev.sigev_notify_function(sev.sigev_value);
183 sev.sigev_notify_function(sev.sigev_value); 194 }
184 } 195 }
185 } 196
186 197 static void* io_thread_func(void* ctx) {
187 static void *io_thread_func(void *ctx) 198 struct aio_thread at, *p;
188 { 199
189 struct aio_thread at, *p; 200 struct aio_args* args = ctx;
190 201 struct aiocb* cb = args->cb;
191 struct aio_args *args = ctx; 202 int fd = cb->aio_fildes;
192 struct aiocb *cb = args->cb; 203 int op = args->op;
193 int fd = cb->aio_fildes; 204 void* buf = (void*)cb->aio_buf;
194 int op = args->op; 205 size_t len = cb->aio_nbytes;
195 void *buf = (void *)cb->aio_buf; 206 off_t off = cb->aio_offset;
196 size_t len = cb->aio_nbytes; 207
197 off_t off = cb->aio_offset; 208 struct aio_queue* q = __aio_get_queue(fd, 1);
198 209 ssize_t ret;
199 struct aio_queue *q = __aio_get_queue(fd, 1); 210
200 ssize_t ret; 211 args->err = q ? 0 : EAGAIN;
201 212 sem_post(&args->sem);
202 args->err = q ? 0 : EAGAIN; 213 if (!q)
203 sem_post(&args->sem); 214 return 0;
204 if (!q) return 0; 215
205 216 at.op = op;
206 at.op = op; 217 at.running = 1;
207 at.running = 1; 218 at.ret = -1;
208 at.ret = -1; 219 at.err = ECANCELED;
209 at.err = ECANCELED; 220 at.q = q;
210 at.q = q; 221 at.td = __pthread_self();
211 at.td = __pthread_self(); 222 at.cb = cb;
212 at.cb = cb; 223 at.prev = 0;
213 at.prev = 0; 224 if ((at.next = q->head))
214 if ((at.next = q->head)) at.next->prev = &at; 225 at.next->prev = &at;
215 q->head = &at; 226 q->head = &at;
216 q->ref++; 227 q->ref++;
217 228
218 if (!q->init) { 229 if (!q->init) {
219 int seekable = lseek(fd, 0, SEEK_CUR) >= 0; 230 int seekable = lseek(fd, 0, SEEK_CUR) >= 0;
220 q->seekable = seekable; 231 q->seekable = seekable;
221 q->append = !seekable || (fcntl(fd, F_GETFL) & O_APPEND); 232 q->append = !seekable || (fcntl(fd, F_GETFL) & O_APPEND);
222 q->init = 1; 233 q->init = 1;
223 } 234 }
224 235
225 pthread_cleanup_push(cleanup, &at); 236 pthread_cleanup_push(cleanup, &at);
226 237
227 /* Wait for sequenced operations. */ 238 /* Wait for sequenced operations. */
228 if (op!=LIO_READ && (op!=LIO_WRITE || q->append)) { 239 if (op != LIO_READ && (op != LIO_WRITE || q->append)) {
229 for (;;) { 240 for (;;) {
230 for (p=at.next; p && p->op!=LIO_WRITE; p=p->next); 241 for (p = at.next; p && p->op != LIO_WRITE; p = p->next)
231 if (!p) break; 242 ;
232 pthread_cond_wait(&q->cond, &q->lock); 243 if (!p)
233 } 244 break;
234 } 245 pthread_cond_wait(&q->cond, &q->lock);
235 246 }
236 pthread_mutex_unlock(&q->lock); 247 }
237 248
238 switch (op) { 249 pthread_mutex_unlock(&q->lock);
239 case LIO_WRITE: 250
240 ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off ); 251 switch (op) {
241 break; 252 case LIO_WRITE:
242 case LIO_READ: 253 ret = q->append ? write(fd, buf, len) : pwrite(fd, buf, len, off);
243 ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, of f); 254 break;
244 break; 255 case LIO_READ:
245 case O_SYNC: 256 ret = !q->seekable ? read(fd, buf, len) : pread(fd, buf, len, off);
246 ret = fsync(fd); 257 break;
247 break; 258 case O_SYNC:
248 case O_DSYNC: 259 ret = fsync(fd);
249 ret = fdatasync(fd); 260 break;
250 break; 261 case O_DSYNC:
251 } 262 ret = fdatasync(fd);
252 at.ret = ret; 263 break;
253 at.err = ret<0 ? errno : 0; 264 }
254 265 at.ret = ret;
255 pthread_cleanup_pop(1); 266 at.err = ret < 0 ? errno : 0;
256 267
257 return 0; 268 pthread_cleanup_pop(1);
258 } 269
259 270 return 0;
260 static int submit(struct aiocb *cb, int op) 271 }
261 { 272
262 int ret = 0; 273 static int submit(struct aiocb* cb, int op) {
263 pthread_attr_t a; 274 int ret = 0;
264 sigset_t allmask, origmask; 275 pthread_attr_t a;
265 pthread_t td; 276 sigset_t allmask, origmask;
266 struct aio_args args = { .cb = cb, .op = op }; 277 pthread_t td;
267 sem_init(&args.sem, 0, 0); 278 struct aio_args args = {.cb = cb, .op = op};
268 279 sem_init(&args.sem, 0, 0);
269 if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) { 280
270 if (cb->aio_sigevent.sigev_notify_attributes) 281 if (cb->aio_sigevent.sigev_notify == SIGEV_THREAD) {
271 a = *cb->aio_sigevent.sigev_notify_attributes; 282 if (cb->aio_sigevent.sigev_notify_attributes)
272 else 283 a = *cb->aio_sigevent.sigev_notify_attributes;
273 pthread_attr_init(&a); 284 else
274 } else { 285 pthread_attr_init(&a);
275 pthread_attr_init(&a); 286 } else {
276 pthread_attr_setstacksize(&a, PTHREAD_STACK_MIN); 287 pthread_attr_init(&a);
277 pthread_attr_setguardsize(&a, 0); 288 pthread_attr_setstacksize(&a, PTHREAD_STACK_MIN);
278 } 289 pthread_attr_setguardsize(&a, 0);
279 pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED); 290 }
280 sigfillset(&allmask); 291 pthread_attr_setdetachstate(&a, PTHREAD_CREATE_DETACHED);
281 pthread_sigmask(SIG_BLOCK, &allmask, &origmask); 292 sigfillset(&allmask);
282 cb->__err = EINPROGRESS; 293 pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
283 if (pthread_create(&td, &a, io_thread_func, &args)) { 294 cb->__err = EINPROGRESS;
284 errno = EAGAIN; 295 if (pthread_create(&td, &a, io_thread_func, &args)) {
285 ret = -1; 296 errno = EAGAIN;
286 } 297 ret = -1;
287 pthread_sigmask(SIG_SETMASK, &origmask, 0); 298 }
288 299 pthread_sigmask(SIG_SETMASK, &origmask, 0);
289 if (!ret) { 300
290 while (sem_wait(&args.sem)); 301 if (!ret) {
291 if (args.err) { 302 while (sem_wait(&args.sem))
292 errno = args.err; 303 ;
293 ret = -1; 304 if (args.err) {
294 } 305 errno = args.err;
295 } 306 ret = -1;
296 307 }
297 return ret; 308 }
298 } 309
299 310 return ret;
300 int aio_read(struct aiocb *cb) 311 }
301 { 312
302 return submit(cb, LIO_READ); 313 int aio_read(struct aiocb* cb) {
303 } 314 return submit(cb, LIO_READ);
304 315 }
305 int aio_write(struct aiocb *cb) 316
306 { 317 int aio_write(struct aiocb* cb) {
307 return submit(cb, LIO_WRITE); 318 return submit(cb, LIO_WRITE);
308 } 319 }
309 320
310 int aio_fsync(int op, struct aiocb *cb) 321 int aio_fsync(int op, struct aiocb* cb) {
311 { 322 if (op != O_SYNC && op != O_DSYNC) {
312 if (op != O_SYNC && op != O_DSYNC) { 323 errno = EINVAL;
313 errno = EINVAL; 324 return -1;
314 return -1; 325 }
315 } 326 return submit(cb, op);
316 return submit(cb, op); 327 }
317 } 328
318 329 ssize_t aio_return(struct aiocb* cb) {
319 ssize_t aio_return(struct aiocb *cb) 330 return cb->__ret;
320 { 331 }
321 return cb->__ret; 332
322 } 333 int aio_error(const struct aiocb* cb) {
323 334 a_barrier();
324 int aio_error(const struct aiocb *cb) 335 return cb->__err & 0x7fffffff;
325 { 336 }
326 a_barrier(); 337
327 return cb->__err & 0x7fffffff; 338 int aio_cancel(int fd, struct aiocb* cb) {
328 } 339 sigset_t allmask, origmask;
329 340 int ret = AIO_ALLDONE;
330 int aio_cancel(int fd, struct aiocb *cb) 341 struct aio_thread* p;
331 { 342 struct aio_queue* q;
332 sigset_t allmask, origmask; 343
333 int ret = AIO_ALLDONE; 344 /* Unspecified behavior case. Report an error. */
334 struct aio_thread *p; 345 if (cb && fd != cb->aio_fildes) {
335 struct aio_queue *q; 346 errno = EINVAL;
336 347 return -1;
337 /* Unspecified behavior case. Report an error. */ 348 }
338 if (cb && fd != cb->aio_fildes) { 349
339 errno = EINVAL; 350 sigfillset(&allmask);
340 return -1; 351 pthread_sigmask(SIG_BLOCK, &allmask, &origmask);
341 } 352
342 353 if (!(q = __aio_get_queue(fd, 0))) {
343 sigfillset(&allmask); 354 if (fcntl(fd, F_GETFD) < 0)
344 pthread_sigmask(SIG_BLOCK, &allmask, &origmask); 355 ret = -1;
345 356 goto done;
346 if (!(q = __aio_get_queue(fd, 0))) { 357 }
347 if (fcntl(fd, F_GETFD) < 0) ret = -1; 358
348 goto done; 359 for (p = q->head; p; p = p->next) {
349 } 360 if (cb && cb != p->cb)
350 361 continue;
351 for (p = q->head; p; p = p->next) { 362 /* Transition target from running to running-with-waiters */
352 if (cb && cb != p->cb) continue; 363 if (a_cas(&p->running, 1, -1)) {
353 /* Transition target from running to running-with-waiters */ 364 pthread_cancel(p->td);
354 if (a_cas(&p->running, 1, -1)) { 365 __wait(&p->running, 0, -1, 1);
355 pthread_cancel(p->td); 366 if (p->err == ECANCELED)
356 __wait(&p->running, 0, -1, 1); 367 ret = AIO_CANCELED;
357 if (p->err == ECANCELED) ret = AIO_CANCELED; 368 }
358 } 369 }
359 } 370
360 371 pthread_mutex_unlock(&q->lock);
361 pthread_mutex_unlock(&q->lock);
362 done: 372 done:
363 » pthread_sigmask(SIG_SETMASK, &origmask, 0); 373 pthread_sigmask(SIG_SETMASK, &origmask, 0);
364 » return ret; 374 return ret;
365 } 375 }
366 376
367 int __aio_close(int fd) 377 int __aio_close(int fd) {
368 { 378 a_barrier();
369 » a_barrier(); 379 if (aio_fd_cnt)
370 » if (aio_fd_cnt) aio_cancel(fd, 0); 380 aio_cancel(fd, 0);
371 » return fd; 381 return fd;
372 } 382 }
373 383
374 LFS64(aio_cancel); 384 LFS64(aio_cancel);
375 LFS64(aio_error); 385 LFS64(aio_error);
376 LFS64(aio_fsync); 386 LFS64(aio_fsync);
377 LFS64(aio_read); 387 LFS64(aio_read);
378 LFS64(aio_write); 388 LFS64(aio_write);
379 LFS64(aio_return); 389 LFS64(aio_return);
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698