Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(299)

Side by Side Diff: kernel/port.c

Issue 1437453002: [kernel][ports] Add basic ports functionality (Closed) Base URL: https://github.com/travisg/lk.git@master
Patch Set: more tests Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 /**
25 * @file
26 * @brief Port object functions
27 * @defgroup event Events
28 *
29 */
30
31 #include <debug.h>
32 #include <list.h>
33 #include <malloc.h>
34 #include <string.h>
35 #include <pow2.h>
36 #include <err.h>
37 #include <kernel/thread.h>
38 #include <kernel/port.h>
39
40 // write ports can be in two states, open and closed, which have a
41 // different magic number.
42
43 #define WRITEPORT_MAGIC_W 'prtw'
44 #define WRITEPORT_MAGIC_X 'prtx'
45
46 #define READPORT_MAGIC 'prtr'
47 #define PORTGROUP_MAGIC 'prtg'
48
49 #define PORT_BUFF_SIZE 8
50 #define PORT_BUFF_SIZE_BIG 64
51
52 #define RESCHEDULE_POLICY true
53
54 typedef struct {
55 uint log2;
56 uint avail;
57 uint head;
58 uint tail;
59 port_packet_t packet[1];
60 } port_buf_t;
61
62 typedef struct {
63 int magic;
64 struct list_node node;
65 port_buf_t* buf;
66 struct list_node rp_list;
67 port_mode_t mode;
68 char name[PORT_NAME_LEN];
69 } write_port_t;
70
71 typedef struct {
72 int magic;
73 wait_queue_t wait;
74 struct list_node rp_list;
75 } port_group_t;
76
77 typedef struct {
78 int magic;
79 struct list_node w_node;
80 struct list_node g_node;
81 port_buf_t* buf;
82 void* ctx;
83 wait_queue_t wait;
84 write_port_t* wport;
85 port_group_t* gport;
86 } read_port_t;
87
88
89 static struct list_node write_port_list;
90
91
92 static port_buf_t* make_buf(uint pk_count)
93 {
94 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
95 port_buf_t* buf = (port_buf_t*) malloc(size);
96 if (!buf)
97 return NULL;
98 buf->log2 = log2_uint(pk_count);
99 buf->head = buf->tail = 0;
100 buf->avail = pk_count;
101 return buf;
102 }
103
104 static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count)
105 {
106 if (buf->avail < count)
107 return ERR_NOT_ENOUGH_BUFFER;
108
109 for (size_t ix = 0; ix != count; ix++) {
110 buf->packet[buf->tail] = packets[ix];
111 buf->tail = modpow2(++buf->tail, buf->log2);
112 }
113 buf->avail -= count;
114 return NO_ERROR;
115 }
116
117 static status_t buf_read(port_buf_t* buf, port_result_t* pr)
118 {
119 if (buf->avail == valpow2(buf->log2))
120 return ERR_NO_MSG;
121 pr->packet = buf->packet[buf->head];
122 buf->head = modpow2(++buf->head, buf->log2);
123 ++buf->avail;
124 return NO_ERROR;
125 }
126
127 // must be called before any use of ports.
128 void port_init(void)
129 {
130 list_initialize(&write_port_list);
travisg 2015/11/19 21:26:10 if you want, can statically initialize the list wi
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 I am a bit afraid of the ordering in global static
travisg 2015/11/20 22:24:24 They're not ordered. It is simply pre-initialized
131 }
132
133 status_t port_create(const char* name, port_mode_t mode, port_t* port)
134 {
135 if (!name || !port)
136 return ERR_INVALID_ARGS;
137
138 // only unicast ports can have a large buffer.
139 if (mode & PORT_MODE_BROADCAST) {
140 if (mode & PORT_MODE_BIG_BUFFER)
141 return ERR_INVALID_ARGS;
142 }
143
144 // lookup for existing port, return that if found.
145 write_port_t* wp = NULL;
146 THREAD_LOCK(state1);
147 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
148 if (strcmp(wp->name, name) == 0) {
149 // can't return closed ports.
150 if (wp->magic == WRITEPORT_MAGIC_X)
151 wp = NULL;
152 THREAD_UNLOCK(state1);
153 if (wp) {
154 *port = (void*) wp;
155 return ERR_ALREADY_EXISTS;
156 } else {
157 return ERR_BUSY;
158 }
159 }
160 }
161 THREAD_UNLOCK(state1);
162
163 // not found, create the write port and the circular buffer.
164 wp = malloc(sizeof(write_port_t));
165 if (!wp)
166 return ERR_NO_MEMORY;
167
168 memset(wp, 0, sizeof(write_port_t));
169 wp->magic = WRITEPORT_MAGIC_W;
170 wp->mode = mode;
171 strlcpy(wp->name, name, sizeof(wp->name));
travisg 2015/11/19 21:26:10 to avoid a problem here, might want to strlen name
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done.
172 list_initialize(&wp->rp_list);
173
174 uint size = mode & PORT_MODE_BIG_BUFFER ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SI ZE;
travisg 2015/11/19 21:26:10 order of ops here? I always forget but the ternary
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done. I usually do put (), kind of surprised I for
175 wp->buf = make_buf(size);
176 if (!wp->buf) {
177 free(wp);
178 return ERR_NO_MEMORY;
179 }
180
181 // todo: race condtion! a port with the same name could have been created
182 // by another thread at is point.
183 THREAD_LOCK(state2);
184 list_add_tail(&write_port_list, &wp->node);
185 THREAD_UNLOCK(state2);
186
187 *port = (void*)wp;
188 return NO_ERROR;
189 }
190
191 status_t port_open(const char* name, void* ctx, port_t* port)
192 {
193 if (!name || !port)
194 return ERR_INVALID_ARGS;
195
196 // assume success; create the read port and buffer now.
197 read_port_t* rp = malloc(sizeof(read_port_t));
198 if (!rp)
199 return ERR_NO_MEMORY;
200
201 memset(rp, 0, sizeof(read_port_t));
travisg 2015/11/19 21:26:10 can use calloc for slightly smaller code
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 Done.
202 rp->magic = READPORT_MAGIC;
203 wait_queue_init(&rp->wait);
204 rp->ctx = ctx;
205
206 // |buf| might not be needed, but we always allocate outside the lock.
207 // this buffer is only needed for broadcast ports, but we don't know
208 // that here.
209 port_buf_t* buf = make_buf(PORT_BUFF_SIZE);
travisg 2015/11/19 21:26:10 check for null on buf
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done.
210
211 // find the named write port and associate it with read port.
212 status_t rc = ERR_NOT_FOUND;
213
214 THREAD_LOCK(state);
215 write_port_t* wp = NULL;
216 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
217 if (strcmp(wp->name, name) == 0) {
218 // found; add read port to write port list.
219 rp->wport = wp;
220 if (wp->buf) {
221 // this is the first read port; transfer the circular buffer.
222 list_add_tail(&wp->rp_list, &rp->w_node);
223 rp->buf = wp->buf;
224 wp->buf = NULL;
225 rc = NO_ERROR;
226 } else if (buf) {
227 // not first read port.
228 if (wp->mode & PORT_MODE_UNICAST) {
229 // cannot add a second listener.
230 rc = ERR_NOT_ALLOWED;
231 break;
232 }
233 // use the new (small) circular buffer.
234 list_add_tail(&wp->rp_list, &rp->w_node);
235 rp->buf = buf;
236 buf = NULL;
237 rc = NO_ERROR;
238 } else {
239 // |buf| allocation failed and the buffer was needed.
240 rc = ERR_NO_MEMORY;
241 }
242 break;
243 }
244 }
245 THREAD_UNLOCK(state);
246
247 if (buf)
248 free(buf);
249
250 if (rc == NO_ERROR) {
251 *port = (void*)rp;
252 } else {
253 free(rp);
254 }
255 return rc;
256 }
257
258 status_t port_group(port_t* ports, size_t count, port_t* group)
259 {
travisg 2015/11/19 21:26:10 check for !ports, !group, and count is some reason
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 Done.
260 // assume success; create port group now.
261 port_group_t* pg = malloc(sizeof(port_group_t));
262 if (!pg)
263 return ERR_NO_MEMORY;
264
265 memset(pg, 0, sizeof(port_group_t));
travisg 2015/11/19 21:26:10 same re: calloc
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 done for all.
266 pg->magic = PORTGROUP_MAGIC;
267 wait_queue_init(&pg->wait);
268 list_initialize(&pg->rp_list);
269
270 status_t rc = NO_ERROR;
271
272 THREAD_LOCK(state);
273 for (size_t ix = 0; ix != count; ix++) {
274 read_port_t* rp = (read_port_t*)ports[ix];
275 if ((rp->magic != READPORT_MAGIC) || rp->gport) {
276 // wrong type of port, or port already part of a group,
277 // in any case, undo the changes to the previous read ports.
278 for (size_t jx = 0; jx != ix; jx++) {
279 ((read_port_t*)ports[jx])->gport = NULL;
280 }
281 rc = ERR_BAD_HANDLE;
282 break;
283 }
284 // link port group and read port.
285 rp->gport = pg;
286 list_add_tail(&pg->rp_list, &rp->g_node);
287 }
288 THREAD_UNLOCK(state);
289
290 if (rc == NO_ERROR) {
291 *group = (port_t*)pg;
292 } else {
293 free(pg);
294 }
295 return rc;
296 }
297
298 status_t port_write(port_t port, const port_packet_t* pk, size_t count)
299 {
travisg 2015/11/19 21:26:10 check for !pk and !port
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done.
300 write_port_t* wp = (write_port_t*)port;
301 THREAD_LOCK(state);
302 if (wp->magic != WRITEPORT_MAGIC_W) {
303 // wrong port type.
304 THREAD_UNLOCK(state);
305 return ERR_BAD_HANDLE;
306 }
307
308 status_t status = NO_ERROR;
309
310 if (wp->buf) {
311 // there are no read ports, just write to the buffer.
312 status = buf_write(wp->buf, pk, count);
313 } else {
314 // there are read ports. for each, write and attempt to wake a thread
315 // from the port group or from the read port itself.
316 read_port_t* rp;
317 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
318 if (buf_write(rp->buf, pk, count) < 0) {
319 // buffer full.
320 status = ERR_PARTIAL_WRITE;
321 continue;
322 }
323
324 int awake_count = 0;
325 if (rp->gport) {
326 awake_count = wait_queue_wake_one(&rp->gport->wait, RESCHEDULE_P OLICY, NO_ERROR);
travisg 2015/11/19 21:26:10 potential hazard: the new thread will wake up and
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 Done via yield()
327 }
328 if (!awake_count) {
329 wait_queue_wake_one(&rp->wait, RESCHEDULE_POLICY, NO_ERROR);
330 }
331 }
332 }
333
334 THREAD_UNLOCK(state);
335 return status;
336 }
337
338 static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_res ult_t* result)
339 {
340 status_t status = buf_read(rp->buf, result);
341 result->ctx = rp->ctx;
342
343 if (status != ERR_NO_MSG)
344 return status;
345
346 // early return allows compiler to elide the rest for the group read case.
347 if (!timeout)
348 return ERR_TIMED_OUT;
349
350 status_t wr = wait_queue_block(&rp->wait, timeout);
351 if (wr != NO_ERROR)
352 return wr;
353 // recursive tail call is usually optimized away with a goto.
354 return read_no_lock(rp, timeout, result);
355 }
356
357 status_t port_read(port_t port, lk_time_t timeout, port_result_t* result)
358 {
travisg 2015/11/19 21:26:10 test for !port and !result
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done.
359 status_t rc = ERR_GENERIC;
360 read_port_t* rp = (read_port_t*)port;
361
362 THREAD_LOCK(state);
363 if (rp->magic == READPORT_MAGIC) {
364 // dealing with a single port.
365 rc = read_no_lock(rp, timeout, result);
366 } else if (rp->magic == PORTGROUP_MAGIC) {
367 // dealing with a port group.
368 port_group_t* pg = (port_group_t*)port;
369 do {
370 // read each port with no timeout.
371 // todo: this order is fixed, probably a bad thing.
372 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
373 rc = read_no_lock(rp, 0, result);
374 if (rc != ERR_TIMED_OUT)
375 goto read_exit;
376 }
377 // no data, block on the group waitqueue.
378 rc = wait_queue_block(&pg->wait, timeout);
379 } while (rc == NO_ERROR);
380 } else {
381 // wrong port type.
382 rc = ERR_BAD_HANDLE;
383 }
384
385 read_exit:
386 THREAD_UNLOCK(state);
387 return rc;
388 }
389
390 status_t port_destroy(port_t port)
391 {
travisg 2015/11/19 21:26:10 test for !port
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 Done.
392 write_port_t* wp = (write_port_t*) port;
393 port_buf_t* buf = NULL;
394
395 THREAD_LOCK(state);
396 if (wp->magic != WRITEPORT_MAGIC_X) {
397 // wrong port type.
398 THREAD_UNLOCK(state);
399 return ERR_BAD_HANDLE;
400 }
401 // remove self from global named ports list.
402 list_delete(&wp->node);
403
404 if (wp->buf) {
405 // we have no readers.
406 buf = wp->buf;
407 } else {
408 // for each reader:
409 read_port_t* rp;
410 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
411 // wake the read and group ports.
412 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
413 if (rp->gport) {
414 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
travisg 2015/11/19 21:26:10 same resched stuff as before. want to not reschedu
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:46 Acknowledged.
415 }
416 // remove self from reader ports.
417 rp->wport = NULL;
418 }
419 }
420
421 wp->magic = 0;
422 THREAD_UNLOCK(state);
423
424 free(buf);
425 free(wp);
426 return NO_ERROR;
427 }
428
429 status_t port_close(port_t port)
430 {
travisg 2015/11/19 21:26:10 test for !port
cpu_(ooo_6.6-7.5) 2015/11/20 22:18:45 Done.
431 read_port_t* rp = (read_port_t*) port;
432 port_buf_t* buf = NULL;
433
434 THREAD_LOCK(state);
435 if (rp->magic == READPORT_MAGIC) {
436 // dealing with a read port.
437 if (rp->wport) {
438 // remove self from write port list and reassign the bufer if last.
439 list_delete(&rp->w_node);
440 if (list_is_empty(&rp->wport->rp_list)) {
441 rp->wport->buf = rp->buf;
442 rp->buf = NULL;
443 } else {
444 buf = rp->buf;
445 }
446 }
447 if (rp->gport) {
448 // remove self from port group list.
449 list_delete(&rp->g_node);
450 }
451 // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
452 wait_queue_destroy(&rp->wait, true);
453 rp->magic = 0;
454
455 } else if (rp->magic == PORTGROUP_MAGIC) {
456 // dealing with a port group.
457 port_group_t* pg = (port_group_t*) port;
458 // wake up waiters.
459 wait_queue_destroy(&pg->wait, true);
460 // remove self from reader ports.
461 rp = NULL;
462 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
463 rp->gport = NULL;
464 }
465 pg->magic = 0;
466
467 } else if (rp->magic == WRITEPORT_MAGIC_W) {
468 // dealing with a write port.
469 write_port_t* wp = (write_port_t*) port;
470 // mark it as closed. Now it can be read but not written to.
471 wp->magic = WRITEPORT_MAGIC_X;
472 THREAD_UNLOCK(state);
473 return NO_ERROR;
474
475 } else {
476 THREAD_UNLOCK(state);
477 return ERR_BAD_HANDLE;
478 }
479
480 THREAD_UNLOCK(state);
481
482 free(buf);
483 free(port);
484 return NO_ERROR;
485 }
486
OLDNEW
« no previous file with comments | « include/kernel/port.h ('k') | kernel/rules.mk » ('j') | top/main.c » ('J')

Powered by Google App Engine
This is Rietveld 408576698