OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org | |
3 * | |
4 * Permission is hereby granted, free of charge, to any person obtaining | |
5 * a copy of this software and associated documentation files | |
6 * (the "Software"), to deal in the Software without restriction, | |
7 * including without limitation the rights to use, copy, modify, merge, | |
8 * publish, distribute, sublicense, and/or sell copies of the Software, | |
9 * and to permit persons to whom the Software is furnished to do so, | |
10 * subject to the following conditions: | |
11 * | |
12 * The above copyright notice and this permission notice shall be | |
13 * included in all copies or substantial portions of the Software. | |
14 * | |
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY | |
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, | |
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE | |
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
22 */ | |
23 | |
24 /** | |
25 * @file | |
26 * @brief Port object functions | |
27 * @defgroup event Events | |
28 * | |
29 */ | |
30 | |
31 #include <debug.h> | |
32 #include <list.h> | |
33 #include <malloc.h> | |
34 #include <string.h> | |
35 #include <pow2.h> | |
36 #include <err.h> | |
37 #include <kernel/thread.h> | |
38 #include <kernel/port.h> | |
39 | |
40 // write ports can be in two states, open and closed, which have a | |
41 // different magic number. | |
42 | |
43 #define WRITEPORT_MAGIC_W 'prtw' | |
44 #define WRITEPORT_MAGIC_X 'prtx' | |
45 | |
46 #define READPORT_MAGIC 'prtr' | |
47 #define PORTGROUP_MAGIC 'prtg' | |
48 | |
49 #define PORT_BUFF_SIZE 8 | |
50 #define PORT_BUFF_SIZE_BIG 64 | |
51 | |
52 #define RESCHEDULE_POLICY true | |
53 | |
54 typedef struct { | |
55 uint log2; | |
56 uint avail; | |
57 uint head; | |
58 uint tail; | |
59 port_packet_t packet[1]; | |
60 } port_buf_t; | |
61 | |
62 typedef struct { | |
63 int magic; | |
64 struct list_node node; | |
65 port_buf_t* buf; | |
66 struct list_node rp_list; | |
67 port_mode_t mode; | |
68 char name[PORT_NAME_LEN]; | |
69 } write_port_t; | |
70 | |
71 typedef struct { | |
72 int magic; | |
73 wait_queue_t wait; | |
74 struct list_node rp_list; | |
75 } port_group_t; | |
76 | |
77 typedef struct { | |
78 int magic; | |
79 struct list_node w_node; | |
80 struct list_node g_node; | |
81 port_buf_t* buf; | |
82 void* ctx; | |
83 wait_queue_t wait; | |
84 write_port_t* wport; | |
85 port_group_t* gport; | |
86 } read_port_t; | |
87 | |
88 | |
89 static struct list_node write_port_list; | |
90 | |
91 | |
92 static port_buf_t* make_buf(uint pk_count) | |
93 { | |
94 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t)); | |
95 port_buf_t* buf = (port_buf_t*) malloc(size); | |
96 if (!buf) | |
97 return NULL; | |
98 buf->log2 = log2_uint(pk_count); | |
99 buf->head = buf->tail = 0; | |
100 buf->avail = pk_count; | |
101 return buf; | |
102 } | |
103 | |
104 static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count) | |
105 { | |
106 if (buf->avail < count) | |
107 return ERR_NOT_ENOUGH_BUFFER; | |
108 | |
109 for (size_t ix = 0; ix != count; ix++) { | |
110 buf->packet[buf->tail] = packets[ix]; | |
111 buf->tail = modpow2(++buf->tail, buf->log2); | |
112 } | |
113 buf->avail -= count; | |
114 return NO_ERROR; | |
115 } | |
116 | |
117 static status_t buf_read(port_buf_t* buf, port_result_t* pr) | |
118 { | |
119 if (buf->avail == valpow2(buf->log2)) | |
120 return ERR_NO_MSG; | |
121 pr->packet = buf->packet[buf->head]; | |
122 buf->head = modpow2(++buf->head, buf->log2); | |
123 ++buf->avail; | |
124 return NO_ERROR; | |
125 } | |
126 | |
127 // must be called before any use of ports. | |
128 void port_init(void) | |
129 { | |
130 list_initialize(&write_port_list); | |
travisg
2015/11/19 21:26:10
if you want, can statically initialize the list wi
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
I am a bit afraid of the ordering in global static
travisg
2015/11/20 22:24:24
They're not ordered. It is simply pre-initialized
| |
131 } | |
132 | |
133 status_t port_create(const char* name, port_mode_t mode, port_t* port) | |
134 { | |
135 if (!name || !port) | |
136 return ERR_INVALID_ARGS; | |
137 | |
138 // only unicast ports can have a large buffer. | |
139 if (mode & PORT_MODE_BROADCAST) { | |
140 if (mode & PORT_MODE_BIG_BUFFER) | |
141 return ERR_INVALID_ARGS; | |
142 } | |
143 | |
144 // lookup for existing port, return that if found. | |
145 write_port_t* wp = NULL; | |
146 THREAD_LOCK(state1); | |
147 list_for_every_entry(&write_port_list, wp, write_port_t, node) { | |
148 if (strcmp(wp->name, name) == 0) { | |
149 // can't return closed ports. | |
150 if (wp->magic == WRITEPORT_MAGIC_X) | |
151 wp = NULL; | |
152 THREAD_UNLOCK(state1); | |
153 if (wp) { | |
154 *port = (void*) wp; | |
155 return ERR_ALREADY_EXISTS; | |
156 } else { | |
157 return ERR_BUSY; | |
158 } | |
159 } | |
160 } | |
161 THREAD_UNLOCK(state1); | |
162 | |
163 // not found, create the write port and the circular buffer. | |
164 wp = malloc(sizeof(write_port_t)); | |
165 if (!wp) | |
166 return ERR_NO_MEMORY; | |
167 | |
168 memset(wp, 0, sizeof(write_port_t)); | |
169 wp->magic = WRITEPORT_MAGIC_W; | |
170 wp->mode = mode; | |
171 strlcpy(wp->name, name, sizeof(wp->name)); | |
travisg
2015/11/19 21:26:10
to avoid a problem here, might want to strlen name
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done.
| |
172 list_initialize(&wp->rp_list); | |
173 | |
174 uint size = mode & PORT_MODE_BIG_BUFFER ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SI ZE; | |
travisg
2015/11/19 21:26:10
order of ops here? I always forget but the ternary
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done. I usually do put (), kind of surprised I for
| |
175 wp->buf = make_buf(size); | |
176 if (!wp->buf) { | |
177 free(wp); | |
178 return ERR_NO_MEMORY; | |
179 } | |
180 | |
181 // todo: race condtion! a port with the same name could have been created | |
182 // by another thread at is point. | |
183 THREAD_LOCK(state2); | |
184 list_add_tail(&write_port_list, &wp->node); | |
185 THREAD_UNLOCK(state2); | |
186 | |
187 *port = (void*)wp; | |
188 return NO_ERROR; | |
189 } | |
190 | |
191 status_t port_open(const char* name, void* ctx, port_t* port) | |
192 { | |
193 if (!name || !port) | |
194 return ERR_INVALID_ARGS; | |
195 | |
196 // assume success; create the read port and buffer now. | |
197 read_port_t* rp = malloc(sizeof(read_port_t)); | |
198 if (!rp) | |
199 return ERR_NO_MEMORY; | |
200 | |
201 memset(rp, 0, sizeof(read_port_t)); | |
travisg
2015/11/19 21:26:10
can use calloc for slightly smaller code
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
Done.
| |
202 rp->magic = READPORT_MAGIC; | |
203 wait_queue_init(&rp->wait); | |
204 rp->ctx = ctx; | |
205 | |
206 // |buf| might not be needed, but we always allocate outside the lock. | |
207 // this buffer is only needed for broadcast ports, but we don't know | |
208 // that here. | |
209 port_buf_t* buf = make_buf(PORT_BUFF_SIZE); | |
travisg
2015/11/19 21:26:10
check for null on buf
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done.
| |
210 | |
211 // find the named write port and associate it with read port. | |
212 status_t rc = ERR_NOT_FOUND; | |
213 | |
214 THREAD_LOCK(state); | |
215 write_port_t* wp = NULL; | |
216 list_for_every_entry(&write_port_list, wp, write_port_t, node) { | |
217 if (strcmp(wp->name, name) == 0) { | |
218 // found; add read port to write port list. | |
219 rp->wport = wp; | |
220 if (wp->buf) { | |
221 // this is the first read port; transfer the circular buffer. | |
222 list_add_tail(&wp->rp_list, &rp->w_node); | |
223 rp->buf = wp->buf; | |
224 wp->buf = NULL; | |
225 rc = NO_ERROR; | |
226 } else if (buf) { | |
227 // not first read port. | |
228 if (wp->mode & PORT_MODE_UNICAST) { | |
229 // cannot add a second listener. | |
230 rc = ERR_NOT_ALLOWED; | |
231 break; | |
232 } | |
233 // use the new (small) circular buffer. | |
234 list_add_tail(&wp->rp_list, &rp->w_node); | |
235 rp->buf = buf; | |
236 buf = NULL; | |
237 rc = NO_ERROR; | |
238 } else { | |
239 // |buf| allocation failed and the buffer was needed. | |
240 rc = ERR_NO_MEMORY; | |
241 } | |
242 break; | |
243 } | |
244 } | |
245 THREAD_UNLOCK(state); | |
246 | |
247 if (buf) | |
248 free(buf); | |
249 | |
250 if (rc == NO_ERROR) { | |
251 *port = (void*)rp; | |
252 } else { | |
253 free(rp); | |
254 } | |
255 return rc; | |
256 } | |
257 | |
258 status_t port_group(port_t* ports, size_t count, port_t* group) | |
259 { | |
travisg
2015/11/19 21:26:10
check for !ports, !group, and count is some reason
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
Done.
| |
260 // assume success; create port group now. | |
261 port_group_t* pg = malloc(sizeof(port_group_t)); | |
262 if (!pg) | |
263 return ERR_NO_MEMORY; | |
264 | |
265 memset(pg, 0, sizeof(port_group_t)); | |
travisg
2015/11/19 21:26:10
same re: calloc
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
done for all.
| |
266 pg->magic = PORTGROUP_MAGIC; | |
267 wait_queue_init(&pg->wait); | |
268 list_initialize(&pg->rp_list); | |
269 | |
270 status_t rc = NO_ERROR; | |
271 | |
272 THREAD_LOCK(state); | |
273 for (size_t ix = 0; ix != count; ix++) { | |
274 read_port_t* rp = (read_port_t*)ports[ix]; | |
275 if ((rp->magic != READPORT_MAGIC) || rp->gport) { | |
276 // wrong type of port, or port already part of a group, | |
277 // in any case, undo the changes to the previous read ports. | |
278 for (size_t jx = 0; jx != ix; jx++) { | |
279 ((read_port_t*)ports[jx])->gport = NULL; | |
280 } | |
281 rc = ERR_BAD_HANDLE; | |
282 break; | |
283 } | |
284 // link port group and read port. | |
285 rp->gport = pg; | |
286 list_add_tail(&pg->rp_list, &rp->g_node); | |
287 } | |
288 THREAD_UNLOCK(state); | |
289 | |
290 if (rc == NO_ERROR) { | |
291 *group = (port_t*)pg; | |
292 } else { | |
293 free(pg); | |
294 } | |
295 return rc; | |
296 } | |
297 | |
298 status_t port_write(port_t port, const port_packet_t* pk, size_t count) | |
299 { | |
travisg
2015/11/19 21:26:10
check for !pk and !port
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done.
| |
300 write_port_t* wp = (write_port_t*)port; | |
301 THREAD_LOCK(state); | |
302 if (wp->magic != WRITEPORT_MAGIC_W) { | |
303 // wrong port type. | |
304 THREAD_UNLOCK(state); | |
305 return ERR_BAD_HANDLE; | |
306 } | |
307 | |
308 status_t status = NO_ERROR; | |
309 | |
310 if (wp->buf) { | |
311 // there are no read ports, just write to the buffer. | |
312 status = buf_write(wp->buf, pk, count); | |
313 } else { | |
314 // there are read ports. for each, write and attempt to wake a thread | |
315 // from the port group or from the read port itself. | |
316 read_port_t* rp; | |
317 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | |
318 if (buf_write(rp->buf, pk, count) < 0) { | |
319 // buffer full. | |
320 status = ERR_PARTIAL_WRITE; | |
321 continue; | |
322 } | |
323 | |
324 int awake_count = 0; | |
325 if (rp->gport) { | |
326 awake_count = wait_queue_wake_one(&rp->gport->wait, RESCHEDULE_P OLICY, NO_ERROR); | |
travisg
2015/11/19 21:26:10
potential hazard: the new thread will wake up and
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
Done via yield()
| |
327 } | |
328 if (!awake_count) { | |
329 wait_queue_wake_one(&rp->wait, RESCHEDULE_POLICY, NO_ERROR); | |
330 } | |
331 } | |
332 } | |
333 | |
334 THREAD_UNLOCK(state); | |
335 return status; | |
336 } | |
337 | |
338 static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_res ult_t* result) | |
339 { | |
340 status_t status = buf_read(rp->buf, result); | |
341 result->ctx = rp->ctx; | |
342 | |
343 if (status != ERR_NO_MSG) | |
344 return status; | |
345 | |
346 // early return allows compiler to elide the rest for the group read case. | |
347 if (!timeout) | |
348 return ERR_TIMED_OUT; | |
349 | |
350 status_t wr = wait_queue_block(&rp->wait, timeout); | |
351 if (wr != NO_ERROR) | |
352 return wr; | |
353 // recursive tail call is usually optimized away with a goto. | |
354 return read_no_lock(rp, timeout, result); | |
355 } | |
356 | |
357 status_t port_read(port_t port, lk_time_t timeout, port_result_t* result) | |
358 { | |
travisg
2015/11/19 21:26:10
test for !port and !result
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done.
| |
359 status_t rc = ERR_GENERIC; | |
360 read_port_t* rp = (read_port_t*)port; | |
361 | |
362 THREAD_LOCK(state); | |
363 if (rp->magic == READPORT_MAGIC) { | |
364 // dealing with a single port. | |
365 rc = read_no_lock(rp, timeout, result); | |
366 } else if (rp->magic == PORTGROUP_MAGIC) { | |
367 // dealing with a port group. | |
368 port_group_t* pg = (port_group_t*)port; | |
369 do { | |
370 // read each port with no timeout. | |
371 // todo: this order is fixed, probably a bad thing. | |
372 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | |
373 rc = read_no_lock(rp, 0, result); | |
374 if (rc != ERR_TIMED_OUT) | |
375 goto read_exit; | |
376 } | |
377 // no data, block on the group waitqueue. | |
378 rc = wait_queue_block(&pg->wait, timeout); | |
379 } while (rc == NO_ERROR); | |
380 } else { | |
381 // wrong port type. | |
382 rc = ERR_BAD_HANDLE; | |
383 } | |
384 | |
385 read_exit: | |
386 THREAD_UNLOCK(state); | |
387 return rc; | |
388 } | |
389 | |
390 status_t port_destroy(port_t port) | |
391 { | |
travisg
2015/11/19 21:26:10
test for !port
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
Done.
| |
392 write_port_t* wp = (write_port_t*) port; | |
393 port_buf_t* buf = NULL; | |
394 | |
395 THREAD_LOCK(state); | |
396 if (wp->magic != WRITEPORT_MAGIC_X) { | |
397 // wrong port type. | |
398 THREAD_UNLOCK(state); | |
399 return ERR_BAD_HANDLE; | |
400 } | |
401 // remove self from global named ports list. | |
402 list_delete(&wp->node); | |
403 | |
404 if (wp->buf) { | |
405 // we have no readers. | |
406 buf = wp->buf; | |
407 } else { | |
408 // for each reader: | |
409 read_port_t* rp; | |
410 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { | |
411 // wake the read and group ports. | |
412 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED); | |
413 if (rp->gport) { | |
414 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED); | |
travisg
2015/11/19 21:26:10
same resched stuff as before. want to not reschedu
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:46
Acknowledged.
| |
415 } | |
416 // remove self from reader ports. | |
417 rp->wport = NULL; | |
418 } | |
419 } | |
420 | |
421 wp->magic = 0; | |
422 THREAD_UNLOCK(state); | |
423 | |
424 free(buf); | |
425 free(wp); | |
426 return NO_ERROR; | |
427 } | |
428 | |
429 status_t port_close(port_t port) | |
430 { | |
travisg
2015/11/19 21:26:10
test for !port
cpu_(ooo_6.6-7.5)
2015/11/20 22:18:45
Done.
| |
431 read_port_t* rp = (read_port_t*) port; | |
432 port_buf_t* buf = NULL; | |
433 | |
434 THREAD_LOCK(state); | |
435 if (rp->magic == READPORT_MAGIC) { | |
436 // dealing with a read port. | |
437 if (rp->wport) { | |
438 // remove self from write port list and reassign the bufer if last. | |
439 list_delete(&rp->w_node); | |
440 if (list_is_empty(&rp->wport->rp_list)) { | |
441 rp->wport->buf = rp->buf; | |
442 rp->buf = NULL; | |
443 } else { | |
444 buf = rp->buf; | |
445 } | |
446 } | |
447 if (rp->gport) { | |
448 // remove self from port group list. | |
449 list_delete(&rp->g_node); | |
450 } | |
451 // wake up waiters, the return code is ERR_OBJECT_DESTROYED. | |
452 wait_queue_destroy(&rp->wait, true); | |
453 rp->magic = 0; | |
454 | |
455 } else if (rp->magic == PORTGROUP_MAGIC) { | |
456 // dealing with a port group. | |
457 port_group_t* pg = (port_group_t*) port; | |
458 // wake up waiters. | |
459 wait_queue_destroy(&pg->wait, true); | |
460 // remove self from reader ports. | |
461 rp = NULL; | |
462 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { | |
463 rp->gport = NULL; | |
464 } | |
465 pg->magic = 0; | |
466 | |
467 } else if (rp->magic == WRITEPORT_MAGIC_W) { | |
468 // dealing with a write port. | |
469 write_port_t* wp = (write_port_t*) port; | |
470 // mark it as closed. Now it can be read but not written to. | |
471 wp->magic = WRITEPORT_MAGIC_X; | |
472 THREAD_UNLOCK(state); | |
473 return NO_ERROR; | |
474 | |
475 } else { | |
476 THREAD_UNLOCK(state); | |
477 return ERR_BAD_HANDLE; | |
478 } | |
479 | |
480 THREAD_UNLOCK(state); | |
481 | |
482 free(buf); | |
483 free(port); | |
484 return NO_ERROR; | |
485 } | |
486 | |
OLD | NEW |