OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org | |
3 * | |
4 * Permission is hereby granted, free of charge, to any person obtaining | |
5 * a copy of this software and associated documentation files | |
6 * (the "Software"), to deal in the Software without restriction, | |
7 * including without limitation the rights to use, copy, modify, merge, | |
8 * publish, distribute, sublicense, and/or sell copies of the Software, | |
9 * and to permit persons to whom the Software is furnished to do so, | |
10 * subject to the following conditions: | |
11 * | |
12 * The above copyright notice and this permission notice shall be | |
13 * included in all copies or substantial portions of the Software. | |
14 * | |
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY | |
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, | |
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE | |
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
22 */ | |
23 | |
24 /** | |
25 * @file | |
26 * @brief Port object functions | |
27 * @defgroup event Events | |
28 * | |
29 */ | |
30 | |
31 #include <debug.h> | |
32 #include <list.h> | |
33 #include <malloc.h> | |
34 #include <string.h> | |
35 #include <pow2.h> | |
36 #include <err.h> | |
37 #include <kernel/thread.h> | |
38 #include <kernel/port.h> | |
39 | |
40 // write ports can be in two states, open and closed, which have a | |
41 // different magic number. | |
42 | |
43 #define WRITEPORT_MAGIC_W 'prtw' | |
44 #define WRITEPORT_MAGIC_X 'prtx' | |
45 | |
46 #define READPORT_MAGIC 'prtr' | |
47 #define PORTGROUP_MAGIC 'prtg' | |
48 | |
49 #define PORT_BUFF_SIZE 8 | |
50 #define PORT_BUFF_SIZE_BIG 64 | |
51 | |
52 #define RESCHEDULE_POLICY 1 | |
53 | |
54 #define MAX_PORT_GROUP_COUNT 256 | |
55 | |
56 typedef struct { | |
57 uint log2; | |
58 uint avail; | |
59 uint head; | |
60 uint tail; | |
61 port_packet_t packet[1]; | |
62 } port_buf_t; | |
63 | |
64 typedef struct { | |
65 int magic; | |
66 struct list_node node; | |
67 port_buf_t* buf; | |
68 struct list_node rp_list; | |
69 port_mode_t mode; | |
70 char name[PORT_NAME_LEN]; | |
71 } write_port_t; | |
72 | |
73 typedef struct { | |
74 int magic; | |
75 wait_queue_t wait; | |
76 struct list_node rp_list; | |
77 } port_group_t; | |
78 | |
79 typedef struct { | |
80 int magic; | |
81 struct list_node w_node; | |
82 struct list_node g_node; | |
83 port_buf_t* buf; | |
84 void* ctx; | |
85 wait_queue_t wait; | |
86 write_port_t* wport; | |
87 port_group_t* gport; | |
88 } read_port_t; | |
89 | |
90 | |
91 static struct list_node write_port_list; | |
92 | |
93 | |
94 static port_buf_t* make_buf(uint pk_count) | |
95 { | |
96 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t)); | |
97 port_buf_t* buf = (port_buf_t*) malloc(size); | |
98 if (!buf) | |
99 return NULL; | |
100 buf->log2 = log2_uint(pk_count); | |
101 buf->head = buf->tail = 0; | |
102 buf->avail = pk_count; | |
103 return buf; | |
104 } | |
105 | |
106 static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count) | |
107 { | |
108 if (buf->avail < count) | |
109 return ERR_NOT_ENOUGH_BUFFER; | |
110 | |
111 for (size_t ix = 0; ix != count; ix++) { | |
112 buf->packet[buf->tail] = packets[ix]; | |
113 buf->tail = modpow2(++buf->tail, buf->log2); | |
114 } | |
115 buf->avail -= count; | |
116 return NO_ERROR; | |
117 } | |
118 | |
119 static status_t buf_read(port_buf_t* buf, port_result_t* pr) | |
120 { | |
121 if (buf->avail == valpow2(buf->log2)) | |
122 return ERR_NO_MSG; | |
123 pr->packet = buf->packet[buf->head]; | |
124 buf->head = modpow2(++buf->head, buf->log2); | |
125 ++buf->avail; | |
126 return NO_ERROR; | |
127 } | |
128 | |
129 // must be called before any use of ports. | |
130 void port_init(void) | |
131 { | |
132 list_initialize(&write_port_list); | |
133 } | |
134 | |
135 status_t port_create(const char* name, port_mode_t mode, port_t* port) | |
136 { | |
137 if (!name || !port) | |
138 return ERR_INVALID_ARGS; | |
139 | |
140 // only unicast ports can have a large buffer. | |
141 if (mode & PORT_MODE_BROADCAST) { | |
142 if (mode & PORT_MODE_BIG_BUFFER) | |
143 return ERR_INVALID_ARGS; | |
144 } | |
145 | |
146 if (strlen(name) > PORT_NAME_LEN) | |
travisg
2015/11/20 22:24:24
- 1 or >= perhaps due to null terminator?
| |
147 return ERR_INVALID_ARGS; | |
148 | |
149 // lookup for existing port, return that if found. | |
150 write_port_t* wp = NULL; | |
151 THREAD_LOCK(state1); | |
152 list_for_every_entry(&write_port_list, wp, write_port_t, node) { | |
153 if (strcmp(wp->name, name) == 0) { | |
154 // can't return closed ports. | |
155 if (wp->magic == WRITEPORT_MAGIC_X) | |
156 wp = NULL; | |
157 THREAD_UNLOCK(state1); | |
158 if (wp) { | |
159 *port = (void*) wp; | |
160 return ERR_ALREADY_EXISTS; | |
161 } else { | |
162 return ERR_BUSY; | |
163 } | |
164 } | |
165 } | |
166 THREAD_UNLOCK(state1); | |
167 | |
168 // not found, create the write port and the circular buffer. | |
169 wp = calloc(1, sizeof(write_port_t)); | |
170 if (!wp) | |
171 return ERR_NO_MEMORY; | |
172 | |
173 wp->magic = WRITEPORT_MAGIC_W; | |
174 wp->mode = mode; | |
175 strlcpy(wp->name, name, sizeof(wp->name)); | |
176 list_initialize(&wp->rp_list); | |
177 | |
178 uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_ SIZE; | |
179 wp->buf = make_buf(size); | |
180 if (!wp->buf) { | |
181 free(wp); | |
182 return ERR_NO_MEMORY; | |
183 } | |
184 | |
185 // todo: race condtion! a port with the same name could have been created | |
186 // by another thread at is point. | |
187 THREAD_LOCK(state2); | |
188 list_add_tail(&write_port_list, &wp->node); | |
189 THREAD_UNLOCK(state2); | |
190 | |
191 *port = (void*)wp; | |
192 return NO_ERROR; | |
193 } | |
194 | |
195 status_t port_open(const char* name, void* ctx, port_t* port) | |
196 { | |
197 if (!name || !port) | |
198 return ERR_INVALID_ARGS; | |
199 | |
200 // assume success; create the read port and buffer now. | |
201 read_port_t* rp = calloc(1, sizeof(read_port_t)); | |
202 if (!rp) | |
203 return ERR_NO_MEMORY; | |
204 | |
205 rp->magic = READPORT_MAGIC; | |
206 wait_queue_init(&rp->wait); | |
207 rp->ctx = ctx; | |
208 | |
209 // |buf| might not be needed, but we always allocate outside the lock. | |
210 // this buffer is only needed for broadcast ports, but we don't know | |
211 // that here. | |
212 port_buf_t* buf = make_buf(PORT_BUFF_SIZE); | |
213 if (!buf) | |
214 return ERR_NO_MEMORY; | |
215 | |
216 // find the named write port and associate it with read port. | |
217 status_t rc = ERR_NOT_FOUND; | |
218 | |
219 THREAD_LOCK(state); | |
220 write_port_t* wp = NULL; | |
221 list_for_every_entry(&write_port_list, wp, write_port_t, node) { | |
222 if (strcmp(wp->name, name) == 0) { | |
223 // found; add read port to write port list. | |
224 rp->wport = wp; | |
225 if (wp->buf) { | |
226 // this is the first read port; transfer the circular buffer. | |
227 list_add_tail(&wp->rp_list, &rp->w_node); | |
228 rp->buf = wp->buf; | |
229 wp->buf = NULL; | |
230 rc = NO_ERROR; | |
231 } else if (buf) { | |
232 // not first read port. | |
233 if (wp->mode & PORT_MODE_UNICAST) { | |
234 // cannot add a second listener. | |
235 rc = ERR_NOT_ALLOWED; | |
236 break; | |
237 } | |
238 // use the new (small) circular buffer. | |
239 list_add_tail(&wp->rp_list, &rp->w_node); | |
240 rp->buf = buf; | |
241 buf = NULL; | |
242 rc = NO_ERROR; | |
243 } else { | |
244 // |buf| allocation failed and the buffer was needed. | |
245 rc = ERR_NO_MEMORY; | |
246 } | |
247 break; | |
248 } | |
249 } | |
250 THREAD_UNLOCK(state); | |
251 | |
252 if (buf) | |
253 free(buf); | |
254 | |
255 if (rc == NO_ERROR) { | |
256 *port = (void*)rp; | |
257 } else { | |
258 free(rp); | |
259 } | |
260 return rc; | |
261 } | |
262 | |
263 status_t port_group(port_t* ports, size_t count, port_t* group) | |
264 { | |
265 if (count > MAX_PORT_GROUP_COUNT) | |
266 return ERR_TOO_BIG; | |
267 | |
268 if (!ports || !group) | |
269 return ERR_INVALID_ARGS; | |
270 | |
271 // assume success; create port group now. | |
272 port_group_t* pg = calloc(1, sizeof(port_group_t)); | |
273 if (!pg) | |
274 return ERR_NO_MEMORY; | |
275 | |
276 pg->magic = PORTGROUP_MAGIC; | |
277 wait_queue_init(&pg->wait); | |
278 list_initialize(&pg->rp_list); | |
279 | |
280 status_t rc = NO_ERROR; | |
281 | |
282 THREAD_LOCK(state); | |
283 for (size_t ix = 0; ix != count; ix++) { | |
284 read_port_t* rp = (read_port_t*)ports[ix]; | |
285 if ((rp->magic != READPORT_MAGIC) || rp->gport) { | |
286 // wrong type of port, or port already part of a group, | |
287 // in any case, undo the changes to the previous read ports. | |
288 for (size_t jx = 0; jx != ix; jx++) { | |
289 ((read_port_t*)ports[jx])->gport = NULL; | |
290 } | |
291 rc = ERR_BAD_HANDLE; | |
292 break; | |
293 } | |
294 // link port group and read port. | |
295 rp->gport = pg; | |
296 list_add_tail(&pg->rp_list, &rp->g_node); | |
297 } | |
298 THREAD_UNLOCK(state); | |
299 | |
300 if (rc == NO_ERROR) { | |
301 *group = (port_t*)pg; | |
302 } else { | |
303 free(pg); | |
304 } | |
305 return rc; | |
306 } | |
307 | |
308 status_t port_write(port_t port, const port_packet_t* pk, size_t count) | |
309 { | |
310 if (!port || !pk) | |
311 return ERR_INVALID_ARGS; | |
312 | |
313 write_port_t* wp = (write_port_t*)port; | |
314 THREAD_LOCK(state); | |
315 if (wp->magic != WRITEPORT_MAGIC_W) { | |
316 // wrong port type. | |
317 THREAD_UNLOCK(state); | |
318 return ERR_BAD_HANDLE; | |
319 } | |
320 | |
321 status_t status = NO_ERROR; | |
322 int awake_count = 0; | |
323 | |
324 if (wp->buf) { | |
325 // there are no read ports, just write to the buffer. | |
326 status = buf_write(wp->buf, pk, count); | |
327 } else { | |
328 // there are read ports. for each, write and attempt to wake a thread | |
329 // from the port group or from the read port itself. | |
330 read_port_t* rp; | |
331 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | |
332 if (buf_write(rp->buf, pk, count) < 0) { | |
333 // buffer full. | |
334 status = ERR_PARTIAL_WRITE; | |
335 continue; | |
336 } | |
337 | |
338 int awaken = 0; | |
339 if (rp->gport) { | |
340 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR); | |
341 } | |
342 if (!awaken) { | |
343 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR); | |
344 } | |
345 | |
346 awake_count += awaken; | |
347 } | |
348 } | |
349 | |
350 THREAD_UNLOCK(state); | |
351 | |
352 #if RESCHEDULE_POLICY | |
353 if (awake_count) | |
354 thread_yield(); | |
355 #endif | |
356 | |
357 return status; | |
358 } | |
359 | |
360 static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_res ult_t* result) | |
361 { | |
362 status_t status = buf_read(rp->buf, result); | |
363 result->ctx = rp->ctx; | |
364 | |
365 if (status != ERR_NO_MSG) | |
366 return status; | |
367 | |
368 // early return allows compiler to elide the rest for the group read case. | |
369 if (!timeout) | |
370 return ERR_TIMED_OUT; | |
371 | |
372 status_t wr = wait_queue_block(&rp->wait, timeout); | |
373 if (wr != NO_ERROR) | |
374 return wr; | |
375 // recursive tail call is usually optimized away with a goto. | |
376 return read_no_lock(rp, timeout, result); | |
377 } | |
378 | |
379 status_t port_read(port_t port, lk_time_t timeout, port_result_t* result) | |
380 { | |
381 if (!port || !result) | |
382 return ERR_INVALID_ARGS; | |
383 | |
384 status_t rc = ERR_GENERIC; | |
385 read_port_t* rp = (read_port_t*)port; | |
386 | |
387 THREAD_LOCK(state); | |
388 if (rp->magic == READPORT_MAGIC) { | |
389 // dealing with a single port. | |
390 rc = read_no_lock(rp, timeout, result); | |
391 } else if (rp->magic == PORTGROUP_MAGIC) { | |
392 // dealing with a port group. | |
393 port_group_t* pg = (port_group_t*)port; | |
394 do { | |
395 // read each port with no timeout. | |
396 // todo: this order is fixed, probably a bad thing. | |
397 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | |
398 rc = read_no_lock(rp, 0, result); | |
399 if (rc != ERR_TIMED_OUT) | |
400 goto read_exit; | |
401 } | |
402 // no data, block on the group waitqueue. | |
403 rc = wait_queue_block(&pg->wait, timeout); | |
404 } while (rc == NO_ERROR); | |
405 } else { | |
406 // wrong port type. | |
407 rc = ERR_BAD_HANDLE; | |
408 } | |
409 | |
410 read_exit: | |
411 THREAD_UNLOCK(state); | |
412 return rc; | |
413 } | |
414 | |
415 status_t port_destroy(port_t port) | |
416 { | |
417 if (!port) | |
418 return ERR_INVALID_ARGS; | |
419 | |
420 write_port_t* wp = (write_port_t*) port; | |
421 port_buf_t* buf = NULL; | |
422 | |
423 THREAD_LOCK(state); | |
424 if (wp->magic != WRITEPORT_MAGIC_X) { | |
425 // wrong port type. | |
426 THREAD_UNLOCK(state); | |
427 return ERR_BAD_HANDLE; | |
428 } | |
429 // remove self from global named ports list. | |
430 list_delete(&wp->node); | |
431 | |
432 if (wp->buf) { | |
433 // we have no readers. | |
434 buf = wp->buf; | |
435 } else { | |
436 // for each reader: | |
437 read_port_t* rp; | |
438 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | |
439 // wake the read and group ports. | |
440 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED); | |
441 if (rp->gport) { | |
442 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED); | |
443 } | |
444 // remove self from reader ports. | |
445 rp->wport = NULL; | |
446 } | |
447 } | |
448 | |
449 wp->magic = 0; | |
450 THREAD_UNLOCK(state); | |
451 | |
452 free(buf); | |
453 free(wp); | |
454 return NO_ERROR; | |
455 } | |
456 | |
457 status_t port_close(port_t port) | |
458 { | |
459 if (!port) | |
460 return ERR_INVALID_ARGS; | |
461 | |
462 read_port_t* rp = (read_port_t*) port; | |
463 port_buf_t* buf = NULL; | |
464 | |
465 THREAD_LOCK(state); | |
466 if (rp->magic == READPORT_MAGIC) { | |
467 // dealing with a read port. | |
468 if (rp->wport) { | |
469 // remove self from write port list and reassign the bufer if last. | |
470 list_delete(&rp->w_node); | |
471 if (list_is_empty(&rp->wport->rp_list)) { | |
472 rp->wport->buf = rp->buf; | |
473 rp->buf = NULL; | |
474 } else { | |
475 buf = rp->buf; | |
476 } | |
477 } | |
478 if (rp->gport) { | |
479 // remove self from port group list. | |
480 list_delete(&rp->g_node); | |
481 } | |
482 // wake up waiters, the return code is ERR_OBJECT_DESTROYED. | |
483 wait_queue_destroy(&rp->wait, true); | |
484 rp->magic = 0; | |
485 | |
486 } else if (rp->magic == PORTGROUP_MAGIC) { | |
487 // dealing with a port group. | |
488 port_group_t* pg = (port_group_t*) port; | |
489 // wake up waiters. | |
490 wait_queue_destroy(&pg->wait, true); | |
491 // remove self from reader ports. | |
492 rp = NULL; | |
493 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | |
494 rp->gport = NULL; | |
495 } | |
496 pg->magic = 0; | |
497 | |
498 } else if (rp->magic == WRITEPORT_MAGIC_W) { | |
499 // dealing with a write port. | |
500 write_port_t* wp = (write_port_t*) port; | |
501 // mark it as closed. Now it can be read but not written to. | |
502 wp->magic = WRITEPORT_MAGIC_X; | |
503 THREAD_UNLOCK(state); | |
504 return NO_ERROR; | |
505 | |
506 } else { | |
507 THREAD_UNLOCK(state); | |
508 return ERR_BAD_HANDLE; | |
509 } | |
510 | |
511 THREAD_UNLOCK(state); | |
512 | |
513 free(buf); | |
514 free(port); | |
515 return NO_ERROR; | |
516 } | |
517 | |
OLD | NEW |