diff options
Diffstat (limited to 'src/c/urweb.c')
-rw-r--r-- | src/c/urweb.c | 789 |
1 files changed, 301 insertions, 488 deletions
diff --git a/src/c/urweb.c b/src/c/urweb.c index 7c0cdb25..0194c226 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -7,6 +7,7 @@ #include <ctype.h> #include <setjmp.h> #include <stdarg.h> +#include <assert.h> #include <pthread.h> @@ -88,79 +89,56 @@ static void buf_append(buf *b, const char *s, size_t len) { typedef enum { UNUSED, USED } usage; -typedef struct channel_list { - struct channel *data; - struct channel_list *next; -} channel_list; - typedef struct client { - size_t id; + unsigned id; usage mode; - union { - struct client *next; - struct { - pthread_mutex_t lock; - int pass; - buf msgs; - int sock; - time_t last_contact; - unsigned refcount; - channel_list *channels; - } used; - } data; + int pass; + struct client *next; + pthread_mutex_t lock; + buf msgs; + int sock; + time_t last_contact; + unsigned n_channels; } client; -typedef struct client_list { - client *data; - struct client_list *next; -} client_list; - -typedef struct channel { - size_t id; - usage mode; - union { - struct channel *next; - struct { - pthread_mutex_t lock; - client_list *clients; - unsigned refcount; - } used; - } data; -} channel; - // Persistent client state -static client **clients, *clients_free; -static size_t n_clients; +static client **clients, *clients_free, *clients_used; +static unsigned n_clients; static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -static client *uw_new_client() { +static client *new_client() { client *c; pthread_mutex_lock(&clients_mutex); if (clients_free) { c = clients_free; - clients_free = clients_free->data.next; + clients_free = clients_free->next; } else { ++n_clients; clients = realloc(clients, sizeof(client) * n_clients); c = malloc(sizeof(client)); c->id = n_clients-1; + pthread_mutex_init(&c->lock, NULL); + buf_init(&c->msgs, 0); clients[n_clients-1] = c; } + pthread_mutex_lock(&c->lock); c->mode = USED; - pthread_mutex_init(&c->data.used.lock, NULL); - c->data.used.pass = rand(); - c->data.used.sock = -1; - c->data.used.last_contact = time(NULL); - buf_init(&c->data.used.msgs, 0); - c->data.used.refcount = 0; - c->data.used.channels = NULL; + c->pass = rand(); + c->sock = -1; + c->last_contact = time(NULL); + buf_reset(&c->msgs); + c->n_channels = 0; + pthread_mutex_unlock(&c->lock); + + c->next = clients_used; + clients_used = c; pthread_mutex_unlock(&clients_mutex); @@ -169,7 +147,7 @@ static client *uw_new_client() { static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -static client *uw_find_client(size_t id) { +static client *find_client(unsigned id) { client *c; pthread_mutex_lock(&clients_mutex); @@ -181,282 +159,111 @@ static client *uw_find_client(size_t id) { c = clients[id]; - if (c->mode != USED) { - pthread_mutex_unlock(&clients_mutex); - return NULL; - } - - pthread_mutex_lock(&c->data.used.lock); - ++c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); pthread_mutex_unlock(&clients_mutex); return c; } -static void uw_release_client(client *c) { - pthread_mutex_lock(&c->data.used.lock); - --c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); -} - -void uw_client_connect(size_t id, int pass, int sock) { - client *c = uw_find_client(id); +void uw_client_connect(unsigned id, int pass, int sock) { + client *c = find_client(id); if (c == NULL) { close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%u)\n", id); return; } - uw_release_client(c); + pthread_mutex_lock(&c->lock); - pthread_mutex_lock(&c->data.used.lock); + if (c->mode != USED) { + pthread_mutex_unlock(&c->lock); + close(sock); + fprintf(stderr, "Client request for unused slot (%u)\n", id); + return; + } - if (pass != c->data.used.pass) { - pthread_mutex_unlock(&c->data.used.lock); + if (pass != c->pass) { + pthread_mutex_unlock(&c->lock); close(sock); - fprintf(stderr, "Wrong client password (%d)\n", (int)id); + fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass); return; } - if (c->data.used.sock != -1) { - close(c->data.used.sock); - c->data.used.sock = -1; + if (c->sock != -1) { + close(c->sock); + c->sock = -1; } - c->data.used.last_contact = time(NULL); + c->last_contact = time(NULL); - if (buf_used(&c->data.used.msgs) > 0) { + if (buf_used(&c->msgs) > 0) { uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); - buf_reset(&c->data.used.msgs); + uw_really_send(sock, c->msgs.start, buf_used(&c->msgs)); + buf_reset(&c->msgs); close(sock); } else - c->data.used.sock = sock; + c->sock = sock; - pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&c->lock); } - -static void uw_free_client(client *c) { - channel_list *chs; - +static void free_client(client *c) { printf("Freeing client %d\n", c->id); - if (c->mode == USED) { - pthread_mutex_lock(&c->data.used.lock); - - for (chs = c->data.used.channels; chs; ) { - client_list *prev, *cs; - - channel *ch = chs->data; - channel_list *tmp = chs->next; - free(chs); - chs = tmp; - - pthread_mutex_lock(&ch->data.used.lock); - for (prev = NULL, cs = ch->data.used.clients; cs; ) { - if (cs->data == c) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - if (prev) - prev->next = cs; - else - ch->data.used.clients = cs; - } - else { - prev = cs; - cs = cs->next; - } - } - pthread_mutex_unlock(&ch->data.used.lock); - } + c->mode = UNUSED; + c->pass = -1; - if (c->data.used.sock != -1) - close(c->data.used.sock); - - pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_destroy(&c->data.used.lock); - buf_free(&c->data.used.msgs); - c->mode = UNUSED; - - c->data.next = clients_free; - clients_free = c; - } + c->next = clients_free; + clients_free = c; } extern int uw_timeout; void uw_prune_clients() { - size_t i; + client *c, *next, *prev = NULL; time_t cutoff; cutoff = time(NULL) - uw_timeout; pthread_mutex_lock(&clients_mutex); - for (i = 0; i < n_clients; ++i) { - if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff - && clients[i]->data.used.refcount == 0) - uw_free_client(clients[i]); - } - - pthread_mutex_unlock(&clients_mutex); -} - - -// Persistent channel state - - -static channel **channels, *channels_free; -static size_t n_channels; - -static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; - -static channel *uw_new_channel() { - channel *ch; - - pthread_mutex_lock(&channels_mutex); - - if (channels_free) { - ch = channels_free; - channels_free = channels_free->data.next; - } - else { - ++n_channels; - channels = realloc(channels, sizeof(channels) * n_channels); - ch = malloc(sizeof(channel)); - ch->id = n_channels-1; - channels[n_channels-1] = ch; - } - - ch->mode = USED; - pthread_mutex_init(&ch->data.used.lock, NULL); - ch->data.used.clients = NULL; - ch->data.used.refcount = 0; - - pthread_mutex_unlock(&channels_mutex); - - return ch; -} - -static void uw_free_channel(channel *ch) { - if (ch->mode == USED) { - client_list *cs; - - for (cs = ch->data.used.clients; cs; ) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - } - pthread_mutex_destroy(&ch->data.used.lock); - ch->mode = UNUSED; - ch->data.next = channels_free; - channels_free = ch; - } -} - -static channel *uw_find_channel(size_t id) { - channel *ch = NULL; - - pthread_mutex_lock(&channels_mutex); - - if (id < n_channels && channels[id]->mode == USED) { - ch = channels[id]; - - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); - } - - pthread_mutex_unlock(&channels_mutex); - - return ch; -} - -static void uw_release_channel(channel *ch) { - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_subscribe(channel *ch, client *c) { - client_list *cs; - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) - if (cs->data == c) { - pthread_mutex_unlock(&ch->data.used.lock); - return; - } - - cs = malloc(sizeof(client_list)); - cs->data = c; - cs->next = ch->data.used.clients; - ch->data.used.clients = cs; - - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_unsubscribe(channel *ch, client *c) { - client_list *prev, *cur, *tmp; - - pthread_mutex_lock(&ch->data.used.lock); - - for (prev = NULL, cur = ch->data.used.clients; cur; ) { - if (cur->data == c) { + for (c = clients_used; c; c = next) { + next = c->next; + pthread_mutex_lock(&c->lock); + if (c->last_contact < cutoff) { if (prev) - prev->next = cur->next; + prev->next = next; else - ch->data.used.clients = cur->next; - tmp = cur; - cur = cur->next; - free(tmp); - } - else { - prev = cur; - cur = cur->next; + clients_used = next; + free_client(c); } + else + prev = c; + pthread_mutex_unlock(&c->lock); } - pthread_mutex_unlock(&ch->data.used.lock); + pthread_mutex_unlock(&clients_mutex); } -static void uw_channel_send(channel *ch, const char *msg) { - size_t len = strlen(msg), preLen; - char pre[INTS_MAX + 2]; - client_list *cs; - - sprintf(pre, "%d\n", (int)ch->id); - preLen = strlen(pre); - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) { - client *c = cs->data; - - pthread_mutex_lock(&c->data.used.lock); +static uw_Basis_channel new_channel(client *c) { + uw_Basis_channel ch = {c->id, c->n_channels++}; + return ch; +} - if (c->data.used.sock != -1) { - uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(c->data.used.sock, pre, preLen); - uw_really_send(c->data.used.sock, msg, len); - uw_really_send(c->data.used.sock, "\n", 1); - close(c->data.used.sock); - c->data.used.sock = -1; - } else { - buf_append(&c->data.used.msgs, pre, preLen); - buf_append(&c->data.used.msgs, msg, len); - buf_append(&c->data.used.msgs, "\n", 1); - } +static void client_send(int already_locked, client *c, buf *msg) { + if (!already_locked) + pthread_mutex_lock(&c->lock); - pthread_mutex_unlock(&c->data.used.lock); - } + if (c->sock != -1) { + uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->sock, msg->start, buf_used(msg)); + close(c->sock); + c->sock = -1; + } else + buf_append(&c->msgs, msg->start, buf_used(msg)); - pthread_mutex_unlock(&ch->data.used.lock); + if (!already_locked) + pthread_mutex_unlock(&c->lock); } @@ -466,7 +273,6 @@ void uw_global_init() { srand(time(NULL) ^ getpid()); clients = malloc(0); - channels = malloc(0); } @@ -484,15 +290,9 @@ typedef struct { } cleanup; typedef struct { - usage mode; - channel *ch; - enum { OLD, NEW } newness; - - size_t n_subscribed; - client **subscribed; - + unsigned client; buf msgs; -} channel_delta; +} delta; struct uw_context { char *headers, *headers_end; @@ -512,11 +312,13 @@ struct uw_context { const char *script_header, *url_prefix; - size_t n_deltas; - channel_delta *deltas; + size_t n_deltas, used_deltas; + delta *deltas; int timeout; + client *client; + char error_message[ERROR_BUF_LEN]; }; @@ -548,7 +350,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si ctx->source_count = 0; - ctx->n_deltas = 0; + ctx->n_deltas = ctx->used_deltas = 0; ctx->deltas = malloc(0); ctx->timeout = uw_timeout; @@ -574,10 +376,8 @@ void uw_free(uw_context ctx) { free(ctx->inputs); free(ctx->cleanup); - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - free(ctx->deltas[i].subscribed); + for (i = 0; i < ctx->n_deltas; ++i) buf_free(&ctx->deltas[i].msgs); - } free(ctx); } @@ -591,8 +391,8 @@ void uw_reset_keep_error_message(uw_context ctx) { ctx->regions = NULL; ctx->cleanup_front = ctx->cleanup; ctx->source_count = 0; - if (ctx->n_deltas > 0) - ctx->deltas[0].mode = UNUSED; + ctx->used_deltas = 0; + ctx->client = NULL; } void uw_reset_keep_request(uw_context ctx) { @@ -669,18 +469,73 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) { ++ctx->cleanup_front; } +uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { + int len = strlen(h); + char *s = ctx->headers, *p; + + while (p = strchr(s, ':')) { + if (p - s == len && !strncasecmp(s, h, len)) { + return p + 2; + } else { + if ((s = strchr(p, 0)) && s < ctx->headers_end) + s += 2; + else + return NULL; + } + } + + return NULL; +} + +void uw_login(uw_context ctx) { + if (ctx->script_header[0]) { + char *id_s, *pass_s; + + if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client")) + && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) { + unsigned id = atoi(id_s); + int pass = atoi(pass_s); + client *c = find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s); + else { + pthread_mutex_lock(&c->lock); + ctx->client = c; + + if (c->mode != USED) + uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id); + if (c->pass != pass) + uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass); + } + } else { + client *c = new_client(); + pthread_mutex_lock(&c->lock); + ctx->client = c; + } + } +} + failure_kind uw_begin(uw_context ctx, char *path) { int r = setjmp(ctx->jmp_buf); if (r == 0) { if (uw_db_begin(ctx)) uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + uw_handle(ctx, path); } return r; } +uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) { + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code"); + + return ctx->client->id; +} + void uw_pop_cleanup(uw_context ctx) { if (ctx->cleanup_front == ctx->cleanup) uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); @@ -843,9 +698,6 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { if (ctx->script_header[0] == 0) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); sprintf(r, "%s<script>%s</script>", ctx->script_header, @@ -855,16 +707,13 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { } const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { - if (ctx->script_header[0] == 0) + if (ctx->client == NULL) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); - sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", - (int)c->id, - c->data.used.pass, + sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", + ctx->client->id, + ctx->client->pass, ctx->url_prefix, ctx->timeout, onload); @@ -942,6 +791,21 @@ uw_Basis_string uw_Basis_jsifyString_ws(uw_context ctx, uw_Basis_string s) { return r; } +char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return "null"; + else { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } +} + uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { int len; size_t s_len = strlen(s); @@ -1007,16 +871,6 @@ char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) { return result; } -char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) { - char *result; - int len; - uw_check_heap(ctx, INTS_MAX); - result = ctx->heap.front; - sprintf(result, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return result; -} - char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { char *result; int len; @@ -1116,15 +970,19 @@ char *uw_Basis_urlifyInt(uw_context ctx, uw_Basis_int n) { return r; } -char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; +char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return ""; + else { + int len; + char *r; - uw_check_heap(ctx, INTS_MAX); - r = ctx->heap.front; - sprintf(r, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } } char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { @@ -1182,13 +1040,15 @@ uw_unit uw_Basis_urlifyInt_w(uw_context ctx, uw_Basis_int n) { return uw_unit_v; } -uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) { - int len; - - uw_check(ctx, INTS_MAX); - sprintf(ctx->page.front, "%lld%n", (long long)n, &len); - ctx->page.front += len; - +uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client != NULL && chn.cli == ctx->client->id) { + int len; + + uw_check(ctx, INTS_MAX + 1); + sprintf(ctx->page.front, "%u%n", chn.chn, &len); + ctx->page.front += len; + } + return uw_unit_v; } @@ -1255,10 +1115,6 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) { return r; } -uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { - return uw_Basis_unurlifyInt(ctx, s); -} - uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1350,10 +1206,6 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) { return uw_unit_v; } -char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { - return uw_Basis_htmlifyInt(ctx, ch); -} - char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1541,17 +1393,6 @@ char *uw_Basis_sqlifyInt(uw_context ctx, uw_Basis_int n) { return r; } -char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; - - uw_check_heap(ctx, INTS_MAX + 6); - r = ctx->heap.front; - sprintf(r, "%lld::int4%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; -} - char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { if (n == NULL) return "NULL"; @@ -1614,6 +1455,52 @@ uw_Basis_string uw_Basis_sqlifyString(uw_context ctx, uw_Basis_string s) { return r; } +char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%lld::int8%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%lld%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%u::int4%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { if (s == NULL) return "NULL"; @@ -1637,6 +1524,21 @@ char *uw_Basis_sqlifyBoolN(uw_context ctx, uw_Basis_bool *b) { char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { size_t len; + char *r, *s; + struct tm stm; + + if (localtime_r(&t, &stm)) { + s = uw_malloc(ctx, TIMES_MAX); + len = strftime(s, TIMES_MAX, TIME_FMT, &stm); + r = uw_malloc(ctx, len + 14); + sprintf(r, "'%s'::timestamp", s); + return r; + } else + return "<Invalid time>"; +} + +char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) { + size_t len; char *r; struct tm stm; @@ -1723,18 +1625,6 @@ uw_Basis_int *uw_Basis_stringToInt(uw_context ctx, uw_Basis_string s) { return NULL; } -uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) { - char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); - - if (*s != '\0' && *endptr == '\0') { - uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel)); - *r = n; - return r; - } else - return NULL; -} - uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { char *endptr; uw_Basis_float n = strtod(s, &endptr); @@ -1802,14 +1692,27 @@ uw_Basis_int uw_Basis_stringToInt_error(uw_context ctx, uw_Basis_string s) { uw_error(ctx, FATAL, "Can't parse int: %s", s); } +#include <errno.h> + uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { + unsigned long long n; + + if (sscanf(s, "%llu", &n) < 1) + uw_error(ctx, FATAL, "Can't parse channel: %s", s); + else { + uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)}; + return ch; + } +} + +uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) { char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); + unsigned long n = strtoul(s, &endptr, 10); if (*s != '\0' && *endptr == '\0') return n; else - uw_error(ctx, FATAL, "Can't parse channel int: %s", s); + uw_error(ctx, FATAL, "Can't parse client: %s", s); } uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { @@ -1856,22 +1759,6 @@ uw_Basis_time uw_Basis_stringToTime_error(uw_context ctx, uw_Basis_string s) { } } -uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { - int len = strlen(h); - char *s = ctx->headers, *p; - - while (p = strchr(s, ':')) { - if (p - s == len && !strncasecmp(s, h, len)) { - return p + 2; - } else { - if ((s = strchr(p, 0)) && s < ctx->headers_end) - s += 2; - else - return NULL; - } - } -} - uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { int len = strlen(c); char *s = ctx->headers, *p = ctx->outHeaders.start; @@ -1930,100 +1817,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str return uw_unit_v; } -static channel_delta *allocate_delta(uw_context ctx, channel *ch) { - size_t i; - channel_delta *cd; +static delta *allocate_delta(uw_context ctx, unsigned client) { + unsigned i; + delta *d; - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); + for (i = 0; i < ctx->used_deltas; ++i) + if (ctx->deltas[i].client == client) + return &ctx->deltas[i]; - if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) - return &ctx->deltas[i]; - - if (i < ctx->n_deltas) - cd = &ctx->deltas[i]; - else { - ++ctx->n_deltas; - ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); - cd = &ctx->deltas[ctx->n_deltas-1]; - cd->n_subscribed = 0; - cd->subscribed = malloc(0); - buf_init(&cd->msgs, 0); + if (ctx->used_deltas >= ctx->n_deltas) { + ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas); + buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0); } - - cd->mode = USED; - cd->newness = OLD; - cd->ch = ch; - if (cd->n_subscribed > 0) - cd->subscribed[0] = NULL; - buf_reset(&cd->msgs); - return cd; + + d = &ctx->deltas[ctx->used_deltas++]; + d->client = client; + buf_reset(&d->msgs); + return d; } uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { - size_t i; - channel *ch = uw_new_channel(); - ++ch->data.used.refcount; - channel_delta *cd = allocate_delta(ctx, ch); + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection"); - cd->newness = NEW; - - return ch->id; -} - -static int delta_used(channel_delta *cd) { - return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); + return new_channel(ctx->client); } -uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { - channel *ch = uw_find_channel(chn); - - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); - int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); - client *c = uw_find_client(id); - - if (c == NULL) { - uw_release_channel(ch); - uw_error(ctx, FATAL, "Unknown client ID in subscription request"); - } else if (c->data.used.pass != pass) { - uw_release_channel(ch); - uw_release_client(c); - uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass); - } else { - size_t i; - channel_delta *cd = allocate_delta(ctx, ch); +uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { + delta *d = allocate_delta(ctx, chn.cli); + size_t len; + int preLen; + char pre[INTS_MAX + 2]; - if (delta_used(cd)) - uw_release_channel(ch); + len = strlen(msg); - for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); + sprintf(pre, "%u\n%n", chn.chn, &preLen); - if (i < cd->n_subscribed) - cd->subscribed[i] = c; - else { - ++cd->n_subscribed; - cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); - cd->subscribed[cd->n_subscribed-1] = c; - } - } - } - - return uw_unit_v; -} - -uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { - channel *ch = uw_find_channel(chn); - - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - channel_delta *cd = allocate_delta(ctx, ch); - if (delta_used(cd)) - uw_release_channel(ch); - buf_append(&cd->msgs, msg, strlen(msg)); - } + buf_append(&d->msgs, pre, preLen); + buf_append(&d->msgs, msg, len); + buf_append(&d->msgs, "\n", 1); return uw_unit_v; } @@ -2032,46 +1864,27 @@ int uw_db_commit(uw_context); int uw_db_rollback(uw_context); void uw_commit(uw_context ctx) { - size_t i, j; + unsigned i; - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; + if (uw_db_commit(ctx)) + uw_error(ctx, FATAL, "Error running SQL COMMIT"); - uw_subscribe(ch, c); - uw_release_client(c); - } + for (i = 0; i < ctx->used_deltas; ++i) { + delta *d = &ctx->deltas[i]; + client *c = find_client(d->client); - if (buf_used(&ctx->deltas[i].msgs) > 0) { - uw_channel_send(ch, ctx->deltas[i].msgs.start); - } + assert (c != NULL && c->mode == USED); - uw_release_channel(ch); + client_send(c == ctx->client, c, &d->msgs); } - if (uw_db_commit(ctx)) - uw_error(ctx, FATAL, "Error running SQL COMMIT"); + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); } int uw_rollback(uw_context ctx) { - size_t i, j; - - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; - - uw_release_client(c); - } - - if (ctx->deltas[i].newness == NEW) - uw_free_channel(ch); - else - uw_release_channel(ch); - } + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); return uw_db_rollback(ctx); } |