summaryrefslogtreecommitdiff
path: root/src/c/urweb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/c/urweb.c')
-rw-r--r--src/c/urweb.c245
1 files changed, 213 insertions, 32 deletions
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 1f4c6fdd..abe08221 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -76,8 +76,14 @@ static size_t buf_avail(buf *b) {
return b->back - b->start;
}
+static void buf_append(buf *b, const char *s, size_t len) {
+ buf_check(b, len);
+ memcpy(b->front, s, len);
+ b->front += len;
+}
+
-// Cross-request state
+// Persistent client state
typedef enum { UNUSED, USED } usage;
@@ -101,21 +107,6 @@ static size_t n_clients;
static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
-void uw_global_init() {
- srand(time(NULL) ^ getpid());
-
- clients = malloc(0);
-}
-
-void uw_global_free() {
- size_t i;
-
- for (i = 0; i < n_clients; ++i)
- free(clients[i]);
-
- free(clients);
-}
-
static client *uw_new_client() {
client *c;
@@ -147,24 +138,33 @@ static client *uw_new_client() {
static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
-void uw_client_connect(size_t id, int pass, int sock) {
+static client *uw_find_client(size_t id) {
client *c;
pthread_mutex_lock(&clients_mutex);
if (id >= n_clients) {
pthread_mutex_unlock(&clients_mutex);
- close(sock);
- fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
- return;
+ return NULL;
}
c = clients[id];
if (c->mode != USED) {
pthread_mutex_unlock(&clients_mutex);
+ return NULL;
+ }
+
+ pthread_mutex_unlock(&clients_mutex);
+ return c;
+}
+
+void uw_client_connect(size_t id, int pass, int sock) {
+ client *c = uw_find_client(id);
+
+ if (c == NULL) {
close(sock);
- fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+ fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
return;
}
@@ -172,7 +172,6 @@ void uw_client_connect(size_t id, int pass, int sock) {
if (pass != c->data.used.pass) {
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
close(sock);
fprintf(stderr, "Wrong client password (%d)\n", (int)id);
return;
@@ -180,7 +179,6 @@ void uw_client_connect(size_t id, int pass, int sock) {
if (c->data.used.sock != -1) {
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
close(sock);
fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
return;
@@ -193,15 +191,10 @@ void uw_client_connect(size_t id, int pass, int sock) {
uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
close(sock);
}
- else {
- uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(sock, "Hi!", 3);
- close(sock);
- //c->data.used.sock = sock;
- }
+ else
+ c->data.used.sock = sock;
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
}
static void uw_free_client(client *c) {
@@ -234,6 +227,150 @@ void uw_prune_clients(time_t timeout) {
}
+// Persistent channel state
+
+typedef struct client_list {
+ client *data;
+ struct client_list *next;
+} client_list;
+
+typedef struct channel {
+ size_t id;
+ usage mode;
+ union {
+ struct channel *next;
+ struct {
+ pthread_mutex_t lock;
+ client_list *clients;
+ } used;
+ } data;
+} channel;
+
+static channel **channels, *channels_free;
+static size_t n_channels;
+
+static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static channel *uw_new_channel() {
+ channel *ch;
+
+ pthread_mutex_lock(&channels_mutex);
+
+ if (channels_free) {
+ ch = channels_free;
+ channels_free = channels_free->data.next;
+ }
+ else {
+ ++n_channels;
+ channels = realloc(channels, sizeof(channels) * n_channels);
+ ch = malloc(sizeof(channel));
+ ch->id = n_channels-1;
+ channels[n_channels-1] = ch;
+ }
+
+ ch->mode = USED;
+ pthread_mutex_init(&ch->data.used.lock, NULL);
+ ch->data.used.clients = NULL;
+
+ pthread_mutex_unlock(&channels_mutex);
+
+ return ch;
+}
+
+static channel *uw_find_channel(size_t id) {
+ channel *ch = NULL;
+
+ pthread_mutex_lock(&channels_mutex);
+
+ if (id < n_channels && channels[id]->mode == USED)
+ ch = channels[id];
+
+ pthread_mutex_unlock(&channels_mutex);
+
+ return ch;
+}
+
+static void uw_subscribe(channel *ch, client *c) {
+ client_list *cs = malloc(sizeof(client_list));
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ cs->data = c;
+ cs->next = ch->data.used.clients;
+ ch->data.used.clients = cs;
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_unsubscribe(channel *ch, client *c) {
+ client_list *prev, *cur, *tmp;
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ for (prev = NULL, cur = ch->data.used.clients; cur; ) {
+ if (cur->data == c) {
+ if (prev)
+ prev->next = cur->next;
+ else
+ ch->data.used.clients = cur->next;
+ tmp = cur;
+ cur = cur->next;
+ free(tmp);
+ }
+ else {
+ prev = cur;
+ cur = cur->next;
+ }
+ }
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_channel_send(channel *ch, const char *msg) {
+ size_t len = strlen(msg), preLen;
+ char pre[INTS_MAX + 2];
+ client_list *cs;
+
+ sprintf(pre, "%d\n", (int)ch->id);
+ preLen = strlen(pre);
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ for (cs = ch->data.used.clients; cs; cs = cs->next) {
+ client *c = cs->data;
+
+ pthread_mutex_lock(&c->data.used.lock);
+
+ if (c->data.used.sock != -1) {
+ uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(c->data.used.sock, pre, preLen);
+ uw_really_send(c->data.used.sock, msg, len);
+ uw_really_send(c->data.used.sock, "\n", 1);
+ close(c->data.used.sock);
+ c->data.used.sock = -1;
+ } else {
+ buf_append(&c->data.used.msgs, pre, preLen);
+ buf_append(&c->data.used.msgs, msg, len);
+ buf_append(&c->data.used.msgs, "\n", 1);
+ }
+
+ pthread_mutex_unlock(&c->data.used.lock);
+ }
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+
+// Global entry points
+
+void uw_global_init() {
+ srand(time(NULL) ^ getpid());
+
+ clients = malloc(0);
+ channels = malloc(0);
+}
+
+
// Single-request state
#define ERROR_BUF_LEN 1024
@@ -582,11 +719,17 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
}
}
-const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) {
if (ctx->script_header[0] == 0)
return "";
- else
+ else if (onload[0] == 0)
return " onload='listener()'";
+ else {
+ uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22);
+
+ sprintf(s, " onload='listener();%s'", onload);
+ return s;
+ }
}
uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
@@ -941,6 +1084,10 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) {
return r;
}
+uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
+ return uw_Basis_unurlifyInt(ctx, s);
+}
+
uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
char *new_s = uw_unurlify_advance(*s);
uw_Basis_float r;
@@ -1032,6 +1179,10 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
+char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
+ return uw_Basis_htmlifyInt(ctx, ch);
+}
+
char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
int len;
char *r;
@@ -1575,4 +1726,34 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
+uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
+ return uw_new_channel()->id;
+}
+
+uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
+ channel *ch = uw_find_channel(chn);
+ if (ch == NULL)
+ uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+ else {
+ size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
+ int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
+ client *c = uw_find_client(id);
+
+ if (c == NULL)
+ uw_error(ctx, FATAL, "Unknown client ID in subscription request");
+ else if (c->data.used.pass != pass)
+ uw_error(ctx, FATAL, "Wrong client password in subscription request");
+ else
+ uw_subscribe(ch, c);
+ }
+}
+
+uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
+ channel *ch = uw_find_channel(chn);
+
+ if (ch == NULL)
+ uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+ else
+ uw_channel_send(ch, msg);
+}