| Index: kernel/port.c
|
| diff --git a/kernel/port.c b/kernel/port.c
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..88d7dab562b31bdd8f81279d59977fe78a096460
|
| --- /dev/null
|
| +++ b/kernel/port.c
|
| @@ -0,0 +1,517 @@
|
| +/*
|
| + * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
|
| + *
|
| + * Permission is hereby granted, free of charge, to any person obtaining
|
| + * a copy of this software and associated documentation files
|
| + * (the "Software"), to deal in the Software without restriction,
|
| + * including without limitation the rights to use, copy, modify, merge,
|
| + * publish, distribute, sublicense, and/or sell copies of the Software,
|
| + * and to permit persons to whom the Software is furnished to do so,
|
| + * subject to the following conditions:
|
| + *
|
| + * The above copyright notice and this permission notice shall be
|
| + * included in all copies or substantial portions of the Software.
|
| + *
|
| + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
| + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
| + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
| + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
| + * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
| + * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
| + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
| + */
|
| +
|
| +/**
|
| + * @file
|
| + * @brief Port object functions
|
| + * @defgroup event Events
|
| + *
|
| + */
|
| +
|
| +#include <debug.h>
|
| +#include <list.h>
|
| +#include <malloc.h>
|
| +#include <string.h>
|
| +#include <pow2.h>
|
| +#include <err.h>
|
| +#include <kernel/thread.h>
|
| +#include <kernel/port.h>
|
| +
|
| +// write ports can be in two states, open and closed, which have a
|
| +// different magic number.
|
| +
|
| +#define WRITEPORT_MAGIC_W 'prtw'
|
| +#define WRITEPORT_MAGIC_X 'prtx'
|
| +
|
| +#define READPORT_MAGIC 'prtr'
|
| +#define PORTGROUP_MAGIC 'prtg'
|
| +
|
| +#define PORT_BUFF_SIZE 8
|
| +#define PORT_BUFF_SIZE_BIG 64
|
| +
|
| +#define RESCHEDULE_POLICY 1
|
| +
|
| +#define MAX_PORT_GROUP_COUNT 256
|
| +
|
| +typedef struct {
|
| + uint log2;
|
| + uint avail;
|
| + uint head;
|
| + uint tail;
|
| + port_packet_t packet[1];
|
| +} port_buf_t;
|
| +
|
| +typedef struct {
|
| + int magic;
|
| + struct list_node node;
|
| + port_buf_t* buf;
|
| + struct list_node rp_list;
|
| + port_mode_t mode;
|
| + char name[PORT_NAME_LEN];
|
| +} write_port_t;
|
| +
|
| +typedef struct {
|
| + int magic;
|
| + wait_queue_t wait;
|
| + struct list_node rp_list;
|
| +} port_group_t;
|
| +
|
| +typedef struct {
|
| + int magic;
|
| + struct list_node w_node;
|
| + struct list_node g_node;
|
| + port_buf_t* buf;
|
| + void* ctx;
|
| + wait_queue_t wait;
|
| + write_port_t* wport;
|
| + port_group_t* gport;
|
| +} read_port_t;
|
| +
|
| +
|
| +static struct list_node write_port_list;
|
| +
|
| +
|
| +static port_buf_t* make_buf(uint pk_count)
|
| +{
|
| + uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
|
| + port_buf_t* buf = (port_buf_t*) malloc(size);
|
| + if (!buf)
|
| + return NULL;
|
| + buf->log2 = log2_uint(pk_count);
|
| + buf->head = buf->tail = 0;
|
| + buf->avail = pk_count;
|
| + return buf;
|
| +}
|
| +
|
| +static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count)
|
| +{
|
| + if (buf->avail < count)
|
| + return ERR_NOT_ENOUGH_BUFFER;
|
| +
|
| + for (size_t ix = 0; ix != count; ix++) {
|
| + buf->packet[buf->tail] = packets[ix];
|
| + buf->tail = modpow2(++buf->tail, buf->log2);
|
| + }
|
| + buf->avail -= count;
|
| + return NO_ERROR;
|
| +}
|
| +
|
| +static status_t buf_read(port_buf_t* buf, port_result_t* pr)
|
| +{
|
| + if (buf->avail == valpow2(buf->log2))
|
| + return ERR_NO_MSG;
|
| + pr->packet = buf->packet[buf->head];
|
| + buf->head = modpow2(++buf->head, buf->log2);
|
| + ++buf->avail;
|
| + return NO_ERROR;
|
| +}
|
| +
|
| +// must be called before any use of ports.
|
| +void port_init(void)
|
| +{
|
| + list_initialize(&write_port_list);
|
| +}
|
| +
|
| +status_t port_create(const char* name, port_mode_t mode, port_t* port)
|
| +{
|
| + if (!name || !port)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + // only unicast ports can have a large buffer.
|
| + if (mode & PORT_MODE_BROADCAST) {
|
| + if (mode & PORT_MODE_BIG_BUFFER)
|
| + return ERR_INVALID_ARGS;
|
| + }
|
| +
|
| + if (strlen(name) >= PORT_NAME_LEN)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + // lookup for existing port, return that if found.
|
| + write_port_t* wp = NULL;
|
| + THREAD_LOCK(state1);
|
| + list_for_every_entry(&write_port_list, wp, write_port_t, node) {
|
| + if (strcmp(wp->name, name) == 0) {
|
| + // can't return closed ports.
|
| + if (wp->magic == WRITEPORT_MAGIC_X)
|
| + wp = NULL;
|
| + THREAD_UNLOCK(state1);
|
| + if (wp) {
|
| + *port = (void*) wp;
|
| + return ERR_ALREADY_EXISTS;
|
| + } else {
|
| + return ERR_BUSY;
|
| + }
|
| + }
|
| + }
|
| + THREAD_UNLOCK(state1);
|
| +
|
| + // not found, create the write port and the circular buffer.
|
| + wp = calloc(1, sizeof(write_port_t));
|
| + if (!wp)
|
| + return ERR_NO_MEMORY;
|
| +
|
| + wp->magic = WRITEPORT_MAGIC_W;
|
| + wp->mode = mode;
|
| + strlcpy(wp->name, name, sizeof(wp->name));
|
| + list_initialize(&wp->rp_list);
|
| +
|
| + uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
|
| + wp->buf = make_buf(size);
|
| + if (!wp->buf) {
|
| + free(wp);
|
| + return ERR_NO_MEMORY;
|
| + }
|
| +
|
| + // todo: race condtion! a port with the same name could have been created
|
| + // by another thread at is point.
|
| + THREAD_LOCK(state2);
|
| + list_add_tail(&write_port_list, &wp->node);
|
| + THREAD_UNLOCK(state2);
|
| +
|
| + *port = (void*)wp;
|
| + return NO_ERROR;
|
| +}
|
| +
|
| +status_t port_open(const char* name, void* ctx, port_t* port)
|
| +{
|
| + if (!name || !port)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + // assume success; create the read port and buffer now.
|
| + read_port_t* rp = calloc(1, sizeof(read_port_t));
|
| + if (!rp)
|
| + return ERR_NO_MEMORY;
|
| +
|
| + rp->magic = READPORT_MAGIC;
|
| + wait_queue_init(&rp->wait);
|
| + rp->ctx = ctx;
|
| +
|
| + // |buf| might not be needed, but we always allocate outside the lock.
|
| + // this buffer is only needed for broadcast ports, but we don't know
|
| + // that here.
|
| + port_buf_t* buf = make_buf(PORT_BUFF_SIZE);
|
| + if (!buf)
|
| + return ERR_NO_MEMORY;
|
| +
|
| + // find the named write port and associate it with read port.
|
| + status_t rc = ERR_NOT_FOUND;
|
| +
|
| + THREAD_LOCK(state);
|
| + write_port_t* wp = NULL;
|
| + list_for_every_entry(&write_port_list, wp, write_port_t, node) {
|
| + if (strcmp(wp->name, name) == 0) {
|
| + // found; add read port to write port list.
|
| + rp->wport = wp;
|
| + if (wp->buf) {
|
| + // this is the first read port; transfer the circular buffer.
|
| + list_add_tail(&wp->rp_list, &rp->w_node);
|
| + rp->buf = wp->buf;
|
| + wp->buf = NULL;
|
| + rc = NO_ERROR;
|
| + } else if (buf) {
|
| + // not first read port.
|
| + if (wp->mode & PORT_MODE_UNICAST) {
|
| + // cannot add a second listener.
|
| + rc = ERR_NOT_ALLOWED;
|
| + break;
|
| + }
|
| + // use the new (small) circular buffer.
|
| + list_add_tail(&wp->rp_list, &rp->w_node);
|
| + rp->buf = buf;
|
| + buf = NULL;
|
| + rc = NO_ERROR;
|
| + } else {
|
| + // |buf| allocation failed and the buffer was needed.
|
| + rc = ERR_NO_MEMORY;
|
| + }
|
| + break;
|
| + }
|
| + }
|
| + THREAD_UNLOCK(state);
|
| +
|
| + if (buf)
|
| + free(buf);
|
| +
|
| + if (rc == NO_ERROR) {
|
| + *port = (void*)rp;
|
| + } else {
|
| + free(rp);
|
| + }
|
| + return rc;
|
| +}
|
| +
|
| +status_t port_group(port_t* ports, size_t count, port_t* group)
|
| +{
|
| + if (count > MAX_PORT_GROUP_COUNT)
|
| + return ERR_TOO_BIG;
|
| +
|
| + if (!ports || !group)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + // assume success; create port group now.
|
| + port_group_t* pg = calloc(1, sizeof(port_group_t));
|
| + if (!pg)
|
| + return ERR_NO_MEMORY;
|
| +
|
| + pg->magic = PORTGROUP_MAGIC;
|
| + wait_queue_init(&pg->wait);
|
| + list_initialize(&pg->rp_list);
|
| +
|
| + status_t rc = NO_ERROR;
|
| +
|
| + THREAD_LOCK(state);
|
| + for (size_t ix = 0; ix != count; ix++) {
|
| + read_port_t* rp = (read_port_t*)ports[ix];
|
| + if ((rp->magic != READPORT_MAGIC) || rp->gport) {
|
| + // wrong type of port, or port already part of a group,
|
| + // in any case, undo the changes to the previous read ports.
|
| + for (size_t jx = 0; jx != ix; jx++) {
|
| + ((read_port_t*)ports[jx])->gport = NULL;
|
| + }
|
| + rc = ERR_BAD_HANDLE;
|
| + break;
|
| + }
|
| + // link port group and read port.
|
| + rp->gport = pg;
|
| + list_add_tail(&pg->rp_list, &rp->g_node);
|
| + }
|
| + THREAD_UNLOCK(state);
|
| +
|
| + if (rc == NO_ERROR) {
|
| + *group = (port_t*)pg;
|
| + } else {
|
| + free(pg);
|
| + }
|
| + return rc;
|
| +}
|
| +
|
| +status_t port_write(port_t port, const port_packet_t* pk, size_t count)
|
| +{
|
| + if (!port || !pk)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + write_port_t* wp = (write_port_t*)port;
|
| + THREAD_LOCK(state);
|
| + if (wp->magic != WRITEPORT_MAGIC_W) {
|
| + // wrong port type.
|
| + THREAD_UNLOCK(state);
|
| + return ERR_BAD_HANDLE;
|
| + }
|
| +
|
| + status_t status = NO_ERROR;
|
| + int awake_count = 0;
|
| +
|
| + if (wp->buf) {
|
| + // there are no read ports, just write to the buffer.
|
| + status = buf_write(wp->buf, pk, count);
|
| + } else {
|
| + // there are read ports. for each, write and attempt to wake a thread
|
| + // from the port group or from the read port itself.
|
| + read_port_t* rp;
|
| + list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
|
| + if (buf_write(rp->buf, pk, count) < 0) {
|
| + // buffer full.
|
| + status = ERR_PARTIAL_WRITE;
|
| + continue;
|
| + }
|
| +
|
| + int awaken = 0;
|
| + if (rp->gport) {
|
| + awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
|
| + }
|
| + if (!awaken) {
|
| + awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
|
| + }
|
| +
|
| + awake_count += awaken;
|
| + }
|
| + }
|
| +
|
| + THREAD_UNLOCK(state);
|
| +
|
| +#if RESCHEDULE_POLICY
|
| + if (awake_count)
|
| + thread_yield();
|
| +#endif
|
| +
|
| + return status;
|
| +}
|
| +
|
| +static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_result_t* result)
|
| +{
|
| + status_t status = buf_read(rp->buf, result);
|
| + result->ctx = rp->ctx;
|
| +
|
| + if (status != ERR_NO_MSG)
|
| + return status;
|
| +
|
| + // early return allows compiler to elide the rest for the group read case.
|
| + if (!timeout)
|
| + return ERR_TIMED_OUT;
|
| +
|
| + status_t wr = wait_queue_block(&rp->wait, timeout);
|
| + if (wr != NO_ERROR)
|
| + return wr;
|
| + // recursive tail call is usually optimized away with a goto.
|
| + return read_no_lock(rp, timeout, result);
|
| +}
|
| +
|
| +status_t port_read(port_t port, lk_time_t timeout, port_result_t* result)
|
| +{
|
| + if (!port || !result)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + status_t rc = ERR_GENERIC;
|
| + read_port_t* rp = (read_port_t*)port;
|
| +
|
| + THREAD_LOCK(state);
|
| + if (rp->magic == READPORT_MAGIC) {
|
| + // dealing with a single port.
|
| + rc = read_no_lock(rp, timeout, result);
|
| + } else if (rp->magic == PORTGROUP_MAGIC) {
|
| + // dealing with a port group.
|
| + port_group_t* pg = (port_group_t*)port;
|
| + do {
|
| + // read each port with no timeout.
|
| + // todo: this order is fixed, probably a bad thing.
|
| + list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
|
| + rc = read_no_lock(rp, 0, result);
|
| + if (rc != ERR_TIMED_OUT)
|
| + goto read_exit;
|
| + }
|
| + // no data, block on the group waitqueue.
|
| + rc = wait_queue_block(&pg->wait, timeout);
|
| + } while (rc == NO_ERROR);
|
| + } else {
|
| + // wrong port type.
|
| + rc = ERR_BAD_HANDLE;
|
| + }
|
| +
|
| +read_exit:
|
| + THREAD_UNLOCK(state);
|
| + return rc;
|
| +}
|
| +
|
| +status_t port_destroy(port_t port)
|
| +{
|
| + if (!port)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + write_port_t* wp = (write_port_t*) port;
|
| + port_buf_t* buf = NULL;
|
| +
|
| + THREAD_LOCK(state);
|
| + if (wp->magic != WRITEPORT_MAGIC_X) {
|
| + // wrong port type.
|
| + THREAD_UNLOCK(state);
|
| + return ERR_BAD_HANDLE;
|
| + }
|
| + // remove self from global named ports list.
|
| + list_delete(&wp->node);
|
| +
|
| + if (wp->buf) {
|
| + // we have no readers.
|
| + buf = wp->buf;
|
| + } else {
|
| + // for each reader:
|
| + read_port_t* rp;
|
| + list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
|
| + // wake the read and group ports.
|
| + wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
|
| + if (rp->gport) {
|
| + wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
|
| + }
|
| + // remove self from reader ports.
|
| + rp->wport = NULL;
|
| + }
|
| + }
|
| +
|
| + wp->magic = 0;
|
| + THREAD_UNLOCK(state);
|
| +
|
| + free(buf);
|
| + free(wp);
|
| + return NO_ERROR;
|
| +}
|
| +
|
| +status_t port_close(port_t port)
|
| +{
|
| + if (!port)
|
| + return ERR_INVALID_ARGS;
|
| +
|
| + read_port_t* rp = (read_port_t*) port;
|
| + port_buf_t* buf = NULL;
|
| +
|
| + THREAD_LOCK(state);
|
| + if (rp->magic == READPORT_MAGIC) {
|
| + // dealing with a read port.
|
| + if (rp->wport) {
|
| + // remove self from write port list and reassign the bufer if last.
|
| + list_delete(&rp->w_node);
|
| + if (list_is_empty(&rp->wport->rp_list)) {
|
| + rp->wport->buf = rp->buf;
|
| + rp->buf = NULL;
|
| + } else {
|
| + buf = rp->buf;
|
| + }
|
| + }
|
| + if (rp->gport) {
|
| + // remove self from port group list.
|
| + list_delete(&rp->g_node);
|
| + }
|
| + // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
|
| + wait_queue_destroy(&rp->wait, true);
|
| + rp->magic = 0;
|
| +
|
| + } else if (rp->magic == PORTGROUP_MAGIC) {
|
| + // dealing with a port group.
|
| + port_group_t* pg = (port_group_t*) port;
|
| + // wake up waiters.
|
| + wait_queue_destroy(&pg->wait, true);
|
| + // remove self from reader ports.
|
| + rp = NULL;
|
| + list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
|
| + rp->gport = NULL;
|
| + }
|
| + pg->magic = 0;
|
| +
|
| + } else if (rp->magic == WRITEPORT_MAGIC_W) {
|
| + // dealing with a write port.
|
| + write_port_t* wp = (write_port_t*) port;
|
| + // mark it as closed. Now it can be read but not written to.
|
| + wp->magic = WRITEPORT_MAGIC_X;
|
| + THREAD_UNLOCK(state);
|
| + return NO_ERROR;
|
| +
|
| + } else {
|
| + THREAD_UNLOCK(state);
|
| + return ERR_BAD_HANDLE;
|
| + }
|
| +
|
| + THREAD_UNLOCK(state);
|
| +
|
| + free(buf);
|
| + free(port);
|
| + return NO_ERROR;
|
| +}
|
| +
|
|
|