Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(36)

Side by Side Diff: kernel/port.c

Issue 1437453002: [kernel][ports] Add basic ports functionality (Closed) Base URL: https://github.com/travisg/lk.git@master
Patch Set: fix Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « kernel/init.c ('k') | kernel/rules.mk » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 /**
25 * @file
26 * @brief Port object functions
27 * @defgroup event Events
28 *
29 */
30
31 #include <debug.h>
32 #include <list.h>
33 #include <malloc.h>
34 #include <string.h>
35 #include <pow2.h>
36 #include <err.h>
37 #include <kernel/thread.h>
38 #include <kernel/port.h>
39
40 // write ports can be in two states, open and closed, which have a
41 // different magic number.
42
43 #define WRITEPORT_MAGIC_W 'prtw'
44 #define WRITEPORT_MAGIC_X 'prtx'
45
46 #define READPORT_MAGIC 'prtr'
47 #define PORTGROUP_MAGIC 'prtg'
48
49 #define PORT_BUFF_SIZE 8
50 #define PORT_BUFF_SIZE_BIG 64
51
52 #define RESCHEDULE_POLICY 1
53
54 #define MAX_PORT_GROUP_COUNT 256
55
56 typedef struct {
57 uint log2;
58 uint avail;
59 uint head;
60 uint tail;
61 port_packet_t packet[1];
62 } port_buf_t;
63
64 typedef struct {
65 int magic;
66 struct list_node node;
67 port_buf_t* buf;
68 struct list_node rp_list;
69 port_mode_t mode;
70 char name[PORT_NAME_LEN];
71 } write_port_t;
72
73 typedef struct {
74 int magic;
75 wait_queue_t wait;
76 struct list_node rp_list;
77 } port_group_t;
78
79 typedef struct {
80 int magic;
81 struct list_node w_node;
82 struct list_node g_node;
83 port_buf_t* buf;
84 void* ctx;
85 wait_queue_t wait;
86 write_port_t* wport;
87 port_group_t* gport;
88 } read_port_t;
89
90
91 static struct list_node write_port_list;
92
93
94 static port_buf_t* make_buf(uint pk_count)
95 {
96 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
97 port_buf_t* buf = (port_buf_t*) malloc(size);
98 if (!buf)
99 return NULL;
100 buf->log2 = log2_uint(pk_count);
101 buf->head = buf->tail = 0;
102 buf->avail = pk_count;
103 return buf;
104 }
105
106 static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count)
107 {
108 if (buf->avail < count)
109 return ERR_NOT_ENOUGH_BUFFER;
110
111 for (size_t ix = 0; ix != count; ix++) {
112 buf->packet[buf->tail] = packets[ix];
113 buf->tail = modpow2(++buf->tail, buf->log2);
114 }
115 buf->avail -= count;
116 return NO_ERROR;
117 }
118
119 static status_t buf_read(port_buf_t* buf, port_result_t* pr)
120 {
121 if (buf->avail == valpow2(buf->log2))
122 return ERR_NO_MSG;
123 pr->packet = buf->packet[buf->head];
124 buf->head = modpow2(++buf->head, buf->log2);
125 ++buf->avail;
126 return NO_ERROR;
127 }
128
129 // must be called before any use of ports.
130 void port_init(void)
131 {
132 list_initialize(&write_port_list);
133 }
134
135 status_t port_create(const char* name, port_mode_t mode, port_t* port)
136 {
137 if (!name || !port)
138 return ERR_INVALID_ARGS;
139
140 // only unicast ports can have a large buffer.
141 if (mode & PORT_MODE_BROADCAST) {
142 if (mode & PORT_MODE_BIG_BUFFER)
143 return ERR_INVALID_ARGS;
144 }
145
146 if (strlen(name) >= PORT_NAME_LEN)
147 return ERR_INVALID_ARGS;
148
149 // lookup for existing port, return that if found.
150 write_port_t* wp = NULL;
151 THREAD_LOCK(state1);
152 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
153 if (strcmp(wp->name, name) == 0) {
154 // can't return closed ports.
155 if (wp->magic == WRITEPORT_MAGIC_X)
156 wp = NULL;
157 THREAD_UNLOCK(state1);
158 if (wp) {
159 *port = (void*) wp;
160 return ERR_ALREADY_EXISTS;
161 } else {
162 return ERR_BUSY;
163 }
164 }
165 }
166 THREAD_UNLOCK(state1);
167
168 // not found, create the write port and the circular buffer.
169 wp = calloc(1, sizeof(write_port_t));
170 if (!wp)
171 return ERR_NO_MEMORY;
172
173 wp->magic = WRITEPORT_MAGIC_W;
174 wp->mode = mode;
175 strlcpy(wp->name, name, sizeof(wp->name));
176 list_initialize(&wp->rp_list);
177
178 uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_ SIZE;
179 wp->buf = make_buf(size);
180 if (!wp->buf) {
181 free(wp);
182 return ERR_NO_MEMORY;
183 }
184
185 // todo: race condtion! a port with the same name could have been created
186 // by another thread at is point.
187 THREAD_LOCK(state2);
188 list_add_tail(&write_port_list, &wp->node);
189 THREAD_UNLOCK(state2);
190
191 *port = (void*)wp;
192 return NO_ERROR;
193 }
194
195 status_t port_open(const char* name, void* ctx, port_t* port)
196 {
197 if (!name || !port)
198 return ERR_INVALID_ARGS;
199
200 // assume success; create the read port and buffer now.
201 read_port_t* rp = calloc(1, sizeof(read_port_t));
202 if (!rp)
203 return ERR_NO_MEMORY;
204
205 rp->magic = READPORT_MAGIC;
206 wait_queue_init(&rp->wait);
207 rp->ctx = ctx;
208
209 // |buf| might not be needed, but we always allocate outside the lock.
210 // this buffer is only needed for broadcast ports, but we don't know
211 // that here.
212 port_buf_t* buf = make_buf(PORT_BUFF_SIZE);
213 if (!buf)
214 return ERR_NO_MEMORY;
215
216 // find the named write port and associate it with read port.
217 status_t rc = ERR_NOT_FOUND;
218
219 THREAD_LOCK(state);
220 write_port_t* wp = NULL;
221 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
222 if (strcmp(wp->name, name) == 0) {
223 // found; add read port to write port list.
224 rp->wport = wp;
225 if (wp->buf) {
226 // this is the first read port; transfer the circular buffer.
227 list_add_tail(&wp->rp_list, &rp->w_node);
228 rp->buf = wp->buf;
229 wp->buf = NULL;
230 rc = NO_ERROR;
231 } else if (buf) {
232 // not first read port.
233 if (wp->mode & PORT_MODE_UNICAST) {
234 // cannot add a second listener.
235 rc = ERR_NOT_ALLOWED;
236 break;
237 }
238 // use the new (small) circular buffer.
239 list_add_tail(&wp->rp_list, &rp->w_node);
240 rp->buf = buf;
241 buf = NULL;
242 rc = NO_ERROR;
243 } else {
244 // |buf| allocation failed and the buffer was needed.
245 rc = ERR_NO_MEMORY;
246 }
247 break;
248 }
249 }
250 THREAD_UNLOCK(state);
251
252 if (buf)
253 free(buf);
254
255 if (rc == NO_ERROR) {
256 *port = (void*)rp;
257 } else {
258 free(rp);
259 }
260 return rc;
261 }
262
263 status_t port_group(port_t* ports, size_t count, port_t* group)
264 {
265 if (count > MAX_PORT_GROUP_COUNT)
266 return ERR_TOO_BIG;
267
268 if (!ports || !group)
269 return ERR_INVALID_ARGS;
270
271 // assume success; create port group now.
272 port_group_t* pg = calloc(1, sizeof(port_group_t));
273 if (!pg)
274 return ERR_NO_MEMORY;
275
276 pg->magic = PORTGROUP_MAGIC;
277 wait_queue_init(&pg->wait);
278 list_initialize(&pg->rp_list);
279
280 status_t rc = NO_ERROR;
281
282 THREAD_LOCK(state);
283 for (size_t ix = 0; ix != count; ix++) {
284 read_port_t* rp = (read_port_t*)ports[ix];
285 if ((rp->magic != READPORT_MAGIC) || rp->gport) {
286 // wrong type of port, or port already part of a group,
287 // in any case, undo the changes to the previous read ports.
288 for (size_t jx = 0; jx != ix; jx++) {
289 ((read_port_t*)ports[jx])->gport = NULL;
290 }
291 rc = ERR_BAD_HANDLE;
292 break;
293 }
294 // link port group and read port.
295 rp->gport = pg;
296 list_add_tail(&pg->rp_list, &rp->g_node);
297 }
298 THREAD_UNLOCK(state);
299
300 if (rc == NO_ERROR) {
301 *group = (port_t*)pg;
302 } else {
303 free(pg);
304 }
305 return rc;
306 }
307
308 status_t port_write(port_t port, const port_packet_t* pk, size_t count)
309 {
310 if (!port || !pk)
311 return ERR_INVALID_ARGS;
312
313 write_port_t* wp = (write_port_t*)port;
314 THREAD_LOCK(state);
315 if (wp->magic != WRITEPORT_MAGIC_W) {
316 // wrong port type.
317 THREAD_UNLOCK(state);
318 return ERR_BAD_HANDLE;
319 }
320
321 status_t status = NO_ERROR;
322 int awake_count = 0;
323
324 if (wp->buf) {
325 // there are no read ports, just write to the buffer.
326 status = buf_write(wp->buf, pk, count);
327 } else {
328 // there are read ports. for each, write and attempt to wake a thread
329 // from the port group or from the read port itself.
330 read_port_t* rp;
331 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
332 if (buf_write(rp->buf, pk, count) < 0) {
333 // buffer full.
334 status = ERR_PARTIAL_WRITE;
335 continue;
336 }
337
338 int awaken = 0;
339 if (rp->gport) {
340 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
341 }
342 if (!awaken) {
343 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
344 }
345
346 awake_count += awaken;
347 }
348 }
349
350 THREAD_UNLOCK(state);
351
352 #if RESCHEDULE_POLICY
353 if (awake_count)
354 thread_yield();
355 #endif
356
357 return status;
358 }
359
360 static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_res ult_t* result)
361 {
362 status_t status = buf_read(rp->buf, result);
363 result->ctx = rp->ctx;
364
365 if (status != ERR_NO_MSG)
366 return status;
367
368 // early return allows compiler to elide the rest for the group read case.
369 if (!timeout)
370 return ERR_TIMED_OUT;
371
372 status_t wr = wait_queue_block(&rp->wait, timeout);
373 if (wr != NO_ERROR)
374 return wr;
375 // recursive tail call is usually optimized away with a goto.
376 return read_no_lock(rp, timeout, result);
377 }
378
379 status_t port_read(port_t port, lk_time_t timeout, port_result_t* result)
380 {
381 if (!port || !result)
382 return ERR_INVALID_ARGS;
383
384 status_t rc = ERR_GENERIC;
385 read_port_t* rp = (read_port_t*)port;
386
387 THREAD_LOCK(state);
388 if (rp->magic == READPORT_MAGIC) {
389 // dealing with a single port.
390 rc = read_no_lock(rp, timeout, result);
391 } else if (rp->magic == PORTGROUP_MAGIC) {
392 // dealing with a port group.
393 port_group_t* pg = (port_group_t*)port;
394 do {
395 // read each port with no timeout.
396 // todo: this order is fixed, probably a bad thing.
397 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
398 rc = read_no_lock(rp, 0, result);
399 if (rc != ERR_TIMED_OUT)
400 goto read_exit;
401 }
402 // no data, block on the group waitqueue.
403 rc = wait_queue_block(&pg->wait, timeout);
404 } while (rc == NO_ERROR);
405 } else {
406 // wrong port type.
407 rc = ERR_BAD_HANDLE;
408 }
409
410 read_exit:
411 THREAD_UNLOCK(state);
412 return rc;
413 }
414
415 status_t port_destroy(port_t port)
416 {
417 if (!port)
418 return ERR_INVALID_ARGS;
419
420 write_port_t* wp = (write_port_t*) port;
421 port_buf_t* buf = NULL;
422
423 THREAD_LOCK(state);
424 if (wp->magic != WRITEPORT_MAGIC_X) {
425 // wrong port type.
426 THREAD_UNLOCK(state);
427 return ERR_BAD_HANDLE;
428 }
429 // remove self from global named ports list.
430 list_delete(&wp->node);
431
432 if (wp->buf) {
433 // we have no readers.
434 buf = wp->buf;
435 } else {
436 // for each reader:
437 read_port_t* rp;
438 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
439 // wake the read and group ports.
440 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
441 if (rp->gport) {
442 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
443 }
444 // remove self from reader ports.
445 rp->wport = NULL;
446 }
447 }
448
449 wp->magic = 0;
450 THREAD_UNLOCK(state);
451
452 free(buf);
453 free(wp);
454 return NO_ERROR;
455 }
456
457 status_t port_close(port_t port)
458 {
459 if (!port)
460 return ERR_INVALID_ARGS;
461
462 read_port_t* rp = (read_port_t*) port;
463 port_buf_t* buf = NULL;
464
465 THREAD_LOCK(state);
466 if (rp->magic == READPORT_MAGIC) {
467 // dealing with a read port.
468 if (rp->wport) {
469 // remove self from write port list and reassign the bufer if last.
470 list_delete(&rp->w_node);
471 if (list_is_empty(&rp->wport->rp_list)) {
472 rp->wport->buf = rp->buf;
473 rp->buf = NULL;
474 } else {
475 buf = rp->buf;
476 }
477 }
478 if (rp->gport) {
479 // remove self from port group list.
480 list_delete(&rp->g_node);
481 }
482 // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
483 wait_queue_destroy(&rp->wait, true);
484 rp->magic = 0;
485
486 } else if (rp->magic == PORTGROUP_MAGIC) {
487 // dealing with a port group.
488 port_group_t* pg = (port_group_t*) port;
489 // wake up waiters.
490 wait_queue_destroy(&pg->wait, true);
491 // remove self from reader ports.
492 rp = NULL;
493 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
494 rp->gport = NULL;
495 }
496 pg->magic = 0;
497
498 } else if (rp->magic == WRITEPORT_MAGIC_W) {
499 // dealing with a write port.
500 write_port_t* wp = (write_port_t*) port;
501 // mark it as closed. Now it can be read but not written to.
502 wp->magic = WRITEPORT_MAGIC_X;
503 THREAD_UNLOCK(state);
504 return NO_ERROR;
505
506 } else {
507 THREAD_UNLOCK(state);
508 return ERR_BAD_HANDLE;
509 }
510
511 THREAD_UNLOCK(state);
512
513 free(buf);
514 free(port);
515 return NO_ERROR;
516 }
517
OLDNEW
« no previous file with comments | « kernel/init.c ('k') | kernel/rules.mk » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698