diff options
author | Adam Chlipala <adamc@hcoop.net> | 2009-03-29 11:37:29 -0400 |
---|---|---|
committer | Adam Chlipala <adamc@hcoop.net> | 2009-03-29 11:37:29 -0400 |
commit | 843fcc973f4cf7b149d4f57732fb66f812115320 (patch) | |
tree | bae33dd5ebd8393e6dd1b30f7d1a2b75241c9956 /src | |
parent | 9f3c3a0215d3f23c8e51fa4824d21dfeaa08ede0 (diff) |
Redo channels, making them single-client
Diffstat (limited to 'src')
-rw-r--r-- | src/c/driver.c | 20 | ||||
-rw-r--r-- | src/c/urweb.c | 789 | ||||
-rw-r--r-- | src/cjr_print.sml | 14 | ||||
-rw-r--r-- | src/jscomp.sml | 5 | ||||
-rw-r--r-- | src/monoize.sml | 12 | ||||
-rw-r--r-- | src/prepare.sml | 2 |
6 files changed, 330 insertions, 512 deletions
diff --git a/src/c/driver.c b/src/c/driver.c index 4fa5db2f..1b616636 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -171,10 +171,10 @@ static void *worker(void *data) { char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass"); if (id && pass) { - size_t idn = atoi(id); + unsigned idn = atoi(id); uw_client_connect(idn, atoi(pass), sock); dont_close = 1; - fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn); + fprintf(stderr, "Processed request for messages by client %u\n\n", idn); } break; } @@ -217,6 +217,8 @@ static void *worker(void *data) { else { printf("Fatal error (out of retries): %s\n", uw_error_message(ctx)); + try_rollback(ctx); + uw_reset_keep_error_message(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); uw_write_header(ctx, "Content-type: text/plain\r\n"); @@ -224,8 +226,6 @@ static void *worker(void *data) { uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n"); - try_rollback(ctx); - break; } } else if (fk == UNLIMITED_RETRY) @@ -233,6 +233,8 @@ static void *worker(void *data) { else if (fk == FATAL) { printf("Fatal error: %s\n", uw_error_message(ctx)); + try_rollback(ctx); + uw_reset_keep_error_message(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n"); uw_write_header(ctx, "Content-type: text/html\r\n"); @@ -241,26 +243,24 @@ static void *worker(void *data) { uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n</body></html>"); - try_rollback(ctx); - break; } else { printf("Unknown uw_handle return code!\n"); + try_rollback(ctx); + uw_reset_keep_request(ctx); uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); uw_write_header(ctx, "Content-type: text/plain\r\n"); uw_write(ctx, "Unknown uw_handle return code!\n"); - try_rollback(ctx); - break; } - uw_reset_keep_request(ctx); - if (try_rollback(ctx)) break; + + uw_reset_keep_request(ctx); } uw_send(ctx, sock); diff --git a/src/c/urweb.c b/src/c/urweb.c index 7c0cdb25..0194c226 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -7,6 +7,7 @@ #include <ctype.h> #include <setjmp.h> #include <stdarg.h> +#include <assert.h> #include <pthread.h> @@ -88,79 +89,56 @@ static void buf_append(buf *b, const char *s, size_t len) { typedef enum { UNUSED, USED } usage; -typedef struct channel_list { - struct channel *data; - struct channel_list *next; -} channel_list; - typedef struct client { - size_t id; + unsigned id; usage mode; - union { - struct client *next; - struct { - pthread_mutex_t lock; - int pass; - buf msgs; - int sock; - time_t last_contact; - unsigned refcount; - channel_list *channels; - } used; - } data; + int pass; + struct client *next; + pthread_mutex_t lock; + buf msgs; + int sock; + time_t last_contact; + unsigned n_channels; } client; -typedef struct client_list { - client *data; - struct client_list *next; -} client_list; - -typedef struct channel { - size_t id; - usage mode; - union { - struct channel *next; - struct { - pthread_mutex_t lock; - client_list *clients; - unsigned refcount; - } used; - } data; -} channel; - // Persistent client state -static client **clients, *clients_free; -static size_t n_clients; +static client **clients, *clients_free, *clients_used; +static unsigned n_clients; static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -static client *uw_new_client() { +static client *new_client() { client *c; pthread_mutex_lock(&clients_mutex); if (clients_free) { c = clients_free; - clients_free = clients_free->data.next; + clients_free = clients_free->next; } else { ++n_clients; clients = realloc(clients, sizeof(client) * n_clients); c = malloc(sizeof(client)); c->id = n_clients-1; + pthread_mutex_init(&c->lock, NULL); + buf_init(&c->msgs, 0); clients[n_clients-1] = c; } + pthread_mutex_lock(&c->lock); c->mode = USED; - pthread_mutex_init(&c->data.used.lock, NULL); - c->data.used.pass = rand(); - c->data.used.sock = -1; - c->data.used.last_contact = time(NULL); - buf_init(&c->data.used.msgs, 0); - c->data.used.refcount = 0; - c->data.used.channels = NULL; + c->pass = rand(); + c->sock = -1; + c->last_contact = time(NULL); + buf_reset(&c->msgs); + c->n_channels = 0; + pthread_mutex_unlock(&c->lock); + + c->next = clients_used; + clients_used = c; pthread_mutex_unlock(&clients_mutex); @@ -169,7 +147,7 @@ static client *uw_new_client() { static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -static client *uw_find_client(size_t id) { +static client *find_client(unsigned id) { client *c; pthread_mutex_lock(&clients_mutex); @@ -181,282 +159,111 @@ static client *uw_find_client(size_t id) { c = clients[id]; - if (c->mode != USED) { - pthread_mutex_unlock(&clients_mutex); - return NULL; - } - - pthread_mutex_lock(&c->data.used.lock); - ++c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); pthread_mutex_unlock(&clients_mutex); return c; } -static void uw_release_client(client *c) { - pthread_mutex_lock(&c->data.used.lock); - --c->data.used.refcount; - pthread_mutex_unlock(&c->data.used.lock); -} - -void uw_client_connect(size_t id, int pass, int sock) { - client *c = uw_find_client(id); +void uw_client_connect(unsigned id, int pass, int sock) { + client *c = find_client(id); if (c == NULL) { close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%u)\n", id); return; } - uw_release_client(c); + pthread_mutex_lock(&c->lock); - pthread_mutex_lock(&c->data.used.lock); + if (c->mode != USED) { + pthread_mutex_unlock(&c->lock); + close(sock); + fprintf(stderr, "Client request for unused slot (%u)\n", id); + return; + } - if (pass != c->data.used.pass) { - pthread_mutex_unlock(&c->data.used.lock); + if (pass != c->pass) { + pthread_mutex_unlock(&c->lock); close(sock); - fprintf(stderr, "Wrong client password (%d)\n", (int)id); + fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass); return; } - if (c->data.used.sock != -1) { - close(c->data.used.sock); - c->data.used.sock = -1; + if (c->sock != -1) { + close(c->sock); + c->sock = -1; } - c->data.used.last_contact = time(NULL); + c->last_contact = time(NULL); - if (buf_used(&c->data.used.msgs) > 0) { + if (buf_used(&c->msgs) > 0) { uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); - buf_reset(&c->data.used.msgs); + uw_really_send(sock, c->msgs.start, buf_used(&c->msgs)); + buf_reset(&c->msgs); close(sock); } else - c->data.used.sock = sock; + c->sock = sock; - pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&c->lock); } - -static void uw_free_client(client *c) { - channel_list *chs; - +static void free_client(client *c) { printf("Freeing client %d\n", c->id); - if (c->mode == USED) { - pthread_mutex_lock(&c->data.used.lock); - - for (chs = c->data.used.channels; chs; ) { - client_list *prev, *cs; - - channel *ch = chs->data; - channel_list *tmp = chs->next; - free(chs); - chs = tmp; - - pthread_mutex_lock(&ch->data.used.lock); - for (prev = NULL, cs = ch->data.used.clients; cs; ) { - if (cs->data == c) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - if (prev) - prev->next = cs; - else - ch->data.used.clients = cs; - } - else { - prev = cs; - cs = cs->next; - } - } - pthread_mutex_unlock(&ch->data.used.lock); - } + c->mode = UNUSED; + c->pass = -1; - if (c->data.used.sock != -1) - close(c->data.used.sock); - - pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_destroy(&c->data.used.lock); - buf_free(&c->data.used.msgs); - c->mode = UNUSED; - - c->data.next = clients_free; - clients_free = c; - } + c->next = clients_free; + clients_free = c; } extern int uw_timeout; void uw_prune_clients() { - size_t i; + client *c, *next, *prev = NULL; time_t cutoff; cutoff = time(NULL) - uw_timeout; pthread_mutex_lock(&clients_mutex); - for (i = 0; i < n_clients; ++i) { - if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff - && clients[i]->data.used.refcount == 0) - uw_free_client(clients[i]); - } - - pthread_mutex_unlock(&clients_mutex); -} - - -// Persistent channel state - - -static channel **channels, *channels_free; -static size_t n_channels; - -static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; - -static channel *uw_new_channel() { - channel *ch; - - pthread_mutex_lock(&channels_mutex); - - if (channels_free) { - ch = channels_free; - channels_free = channels_free->data.next; - } - else { - ++n_channels; - channels = realloc(channels, sizeof(channels) * n_channels); - ch = malloc(sizeof(channel)); - ch->id = n_channels-1; - channels[n_channels-1] = ch; - } - - ch->mode = USED; - pthread_mutex_init(&ch->data.used.lock, NULL); - ch->data.used.clients = NULL; - ch->data.used.refcount = 0; - - pthread_mutex_unlock(&channels_mutex); - - return ch; -} - -static void uw_free_channel(channel *ch) { - if (ch->mode == USED) { - client_list *cs; - - for (cs = ch->data.used.clients; cs; ) { - client_list *tmp = cs->next; - free(cs); - cs = tmp; - } - pthread_mutex_destroy(&ch->data.used.lock); - ch->mode = UNUSED; - ch->data.next = channels_free; - channels_free = ch; - } -} - -static channel *uw_find_channel(size_t id) { - channel *ch = NULL; - - pthread_mutex_lock(&channels_mutex); - - if (id < n_channels && channels[id]->mode == USED) { - ch = channels[id]; - - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); - } - - pthread_mutex_unlock(&channels_mutex); - - return ch; -} - -static void uw_release_channel(channel *ch) { - pthread_mutex_lock(&ch->data.used.lock); - ++ch->data.used.refcount; - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_subscribe(channel *ch, client *c) { - client_list *cs; - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) - if (cs->data == c) { - pthread_mutex_unlock(&ch->data.used.lock); - return; - } - - cs = malloc(sizeof(client_list)); - cs->data = c; - cs->next = ch->data.used.clients; - ch->data.used.clients = cs; - - pthread_mutex_unlock(&ch->data.used.lock); -} - -static void uw_unsubscribe(channel *ch, client *c) { - client_list *prev, *cur, *tmp; - - pthread_mutex_lock(&ch->data.used.lock); - - for (prev = NULL, cur = ch->data.used.clients; cur; ) { - if (cur->data == c) { + for (c = clients_used; c; c = next) { + next = c->next; + pthread_mutex_lock(&c->lock); + if (c->last_contact < cutoff) { if (prev) - prev->next = cur->next; + prev->next = next; else - ch->data.used.clients = cur->next; - tmp = cur; - cur = cur->next; - free(tmp); - } - else { - prev = cur; - cur = cur->next; + clients_used = next; + free_client(c); } + else + prev = c; + pthread_mutex_unlock(&c->lock); } - pthread_mutex_unlock(&ch->data.used.lock); + pthread_mutex_unlock(&clients_mutex); } -static void uw_channel_send(channel *ch, const char *msg) { - size_t len = strlen(msg), preLen; - char pre[INTS_MAX + 2]; - client_list *cs; - - sprintf(pre, "%d\n", (int)ch->id); - preLen = strlen(pre); - - pthread_mutex_lock(&ch->data.used.lock); - - for (cs = ch->data.used.clients; cs; cs = cs->next) { - client *c = cs->data; - - pthread_mutex_lock(&c->data.used.lock); +static uw_Basis_channel new_channel(client *c) { + uw_Basis_channel ch = {c->id, c->n_channels++}; + return ch; +} - if (c->data.used.sock != -1) { - uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(c->data.used.sock, pre, preLen); - uw_really_send(c->data.used.sock, msg, len); - uw_really_send(c->data.used.sock, "\n", 1); - close(c->data.used.sock); - c->data.used.sock = -1; - } else { - buf_append(&c->data.used.msgs, pre, preLen); - buf_append(&c->data.used.msgs, msg, len); - buf_append(&c->data.used.msgs, "\n", 1); - } +static void client_send(int already_locked, client *c, buf *msg) { + if (!already_locked) + pthread_mutex_lock(&c->lock); - pthread_mutex_unlock(&c->data.used.lock); - } + if (c->sock != -1) { + uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->sock, msg->start, buf_used(msg)); + close(c->sock); + c->sock = -1; + } else + buf_append(&c->msgs, msg->start, buf_used(msg)); - pthread_mutex_unlock(&ch->data.used.lock); + if (!already_locked) + pthread_mutex_unlock(&c->lock); } @@ -466,7 +273,6 @@ void uw_global_init() { srand(time(NULL) ^ getpid()); clients = malloc(0); - channels = malloc(0); } @@ -484,15 +290,9 @@ typedef struct { } cleanup; typedef struct { - usage mode; - channel *ch; - enum { OLD, NEW } newness; - - size_t n_subscribed; - client **subscribed; - + unsigned client; buf msgs; -} channel_delta; +} delta; struct uw_context { char *headers, *headers_end; @@ -512,11 +312,13 @@ struct uw_context { const char *script_header, *url_prefix; - size_t n_deltas; - channel_delta *deltas; + size_t n_deltas, used_deltas; + delta *deltas; int timeout; + client *client; + char error_message[ERROR_BUF_LEN]; }; @@ -548,7 +350,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si ctx->source_count = 0; - ctx->n_deltas = 0; + ctx->n_deltas = ctx->used_deltas = 0; ctx->deltas = malloc(0); ctx->timeout = uw_timeout; @@ -574,10 +376,8 @@ void uw_free(uw_context ctx) { free(ctx->inputs); free(ctx->cleanup); - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - free(ctx->deltas[i].subscribed); + for (i = 0; i < ctx->n_deltas; ++i) buf_free(&ctx->deltas[i].msgs); - } free(ctx); } @@ -591,8 +391,8 @@ void uw_reset_keep_error_message(uw_context ctx) { ctx->regions = NULL; ctx->cleanup_front = ctx->cleanup; ctx->source_count = 0; - if (ctx->n_deltas > 0) - ctx->deltas[0].mode = UNUSED; + ctx->used_deltas = 0; + ctx->client = NULL; } void uw_reset_keep_request(uw_context ctx) { @@ -669,18 +469,73 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) { ++ctx->cleanup_front; } +uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { + int len = strlen(h); + char *s = ctx->headers, *p; + + while (p = strchr(s, ':')) { + if (p - s == len && !strncasecmp(s, h, len)) { + return p + 2; + } else { + if ((s = strchr(p, 0)) && s < ctx->headers_end) + s += 2; + else + return NULL; + } + } + + return NULL; +} + +void uw_login(uw_context ctx) { + if (ctx->script_header[0]) { + char *id_s, *pass_s; + + if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client")) + && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) { + unsigned id = atoi(id_s); + int pass = atoi(pass_s); + client *c = find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s); + else { + pthread_mutex_lock(&c->lock); + ctx->client = c; + + if (c->mode != USED) + uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id); + if (c->pass != pass) + uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass); + } + } else { + client *c = new_client(); + pthread_mutex_lock(&c->lock); + ctx->client = c; + } + } +} + failure_kind uw_begin(uw_context ctx, char *path) { int r = setjmp(ctx->jmp_buf); if (r == 0) { if (uw_db_begin(ctx)) uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + uw_handle(ctx, path); } return r; } +uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) { + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code"); + + return ctx->client->id; +} + void uw_pop_cleanup(uw_context ctx) { if (ctx->cleanup_front == ctx->cleanup) uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); @@ -843,9 +698,6 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { if (ctx->script_header[0] == 0) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script)); sprintf(r, "%s<script>%s</script>", ctx->script_header, @@ -855,16 +707,13 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { } const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) { - if (ctx->script_header[0] == 0) + if (ctx->client == NULL) return ""; else { - int pass; - client *c = uw_new_client(&pass); - char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload)); - sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", - (int)c->id, - c->data.used.pass, + sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'", + ctx->client->id, + ctx->client->pass, ctx->url_prefix, ctx->timeout, onload); @@ -942,6 +791,21 @@ uw_Basis_string uw_Basis_jsifyString_ws(uw_context ctx, uw_Basis_string s) { return r; } +char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return "null"; + else { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } +} + uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) { int len; size_t s_len = strlen(s); @@ -1007,16 +871,6 @@ char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) { return result; } -char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) { - char *result; - int len; - uw_check_heap(ctx, INTS_MAX); - result = ctx->heap.front; - sprintf(result, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return result; -} - char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) { char *result; int len; @@ -1116,15 +970,19 @@ char *uw_Basis_urlifyInt(uw_context ctx, uw_Basis_int n) { return r; } -char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; +char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client == NULL || chn.cli != ctx->client->id) + return ""; + else { + int len; + char *r; - uw_check_heap(ctx, INTS_MAX); - r = ctx->heap.front; - sprintf(r, "%lld%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", chn.chn, &len); + ctx->heap.front += len+1; + return r; + } } char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) { @@ -1182,13 +1040,15 @@ uw_unit uw_Basis_urlifyInt_w(uw_context ctx, uw_Basis_int n) { return uw_unit_v; } -uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) { - int len; - - uw_check(ctx, INTS_MAX); - sprintf(ctx->page.front, "%lld%n", (long long)n, &len); - ctx->page.front += len; - +uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) { + if (ctx->client != NULL && chn.cli == ctx->client->id) { + int len; + + uw_check(ctx, INTS_MAX + 1); + sprintf(ctx->page.front, "%u%n", chn.chn, &len); + ctx->page.front += len; + } + return uw_unit_v; } @@ -1255,10 +1115,6 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) { return r; } -uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { - return uw_Basis_unurlifyInt(ctx, s); -} - uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1350,10 +1206,6 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) { return uw_unit_v; } -char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { - return uw_Basis_htmlifyInt(ctx, ch); -} - char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1541,17 +1393,6 @@ char *uw_Basis_sqlifyInt(uw_context ctx, uw_Basis_int n) { return r; } -char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) { - int len; - char *r; - - uw_check_heap(ctx, INTS_MAX + 6); - r = ctx->heap.front; - sprintf(r, "%lld::int4%n", (long long)n, &len); - ctx->heap.front += len+1; - return r; -} - char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) { if (n == NULL) return "NULL"; @@ -1614,6 +1455,52 @@ uw_Basis_string uw_Basis_sqlifyString(uw_context ctx, uw_Basis_string s) { return r; } +char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%lld::int8%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) { + int len; + char *r; + unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%lld%n", combo, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 7); + r = ctx->heap.front; + sprintf(r, "%u::int4%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + +char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) { + int len; + char *r; + + uw_check_heap(ctx, INTS_MAX + 1); + r = ctx->heap.front; + sprintf(r, "%u%n", cli, &len); + ctx->heap.front += len+1; + return r; +} + uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) { if (s == NULL) return "NULL"; @@ -1637,6 +1524,21 @@ char *uw_Basis_sqlifyBoolN(uw_context ctx, uw_Basis_bool *b) { char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) { size_t len; + char *r, *s; + struct tm stm; + + if (localtime_r(&t, &stm)) { + s = uw_malloc(ctx, TIMES_MAX); + len = strftime(s, TIMES_MAX, TIME_FMT, &stm); + r = uw_malloc(ctx, len + 14); + sprintf(r, "'%s'::timestamp", s); + return r; + } else + return "<Invalid time>"; +} + +char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) { + size_t len; char *r; struct tm stm; @@ -1723,18 +1625,6 @@ uw_Basis_int *uw_Basis_stringToInt(uw_context ctx, uw_Basis_string s) { return NULL; } -uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) { - char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); - - if (*s != '\0' && *endptr == '\0') { - uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel)); - *r = n; - return r; - } else - return NULL; -} - uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) { char *endptr; uw_Basis_float n = strtod(s, &endptr); @@ -1802,14 +1692,27 @@ uw_Basis_int uw_Basis_stringToInt_error(uw_context ctx, uw_Basis_string s) { uw_error(ctx, FATAL, "Can't parse int: %s", s); } +#include <errno.h> + uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) { + unsigned long long n; + + if (sscanf(s, "%llu", &n) < 1) + uw_error(ctx, FATAL, "Can't parse channel: %s", s); + else { + uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)}; + return ch; + } +} + +uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) { char *endptr; - uw_Basis_channel n = strtoll(s, &endptr, 10); + unsigned long n = strtoul(s, &endptr, 10); if (*s != '\0' && *endptr == '\0') return n; else - uw_error(ctx, FATAL, "Can't parse channel int: %s", s); + uw_error(ctx, FATAL, "Can't parse client: %s", s); } uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) { @@ -1856,22 +1759,6 @@ uw_Basis_time uw_Basis_stringToTime_error(uw_context ctx, uw_Basis_string s) { } } -uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) { - int len = strlen(h); - char *s = ctx->headers, *p; - - while (p = strchr(s, ':')) { - if (p - s == len && !strncasecmp(s, h, len)) { - return p + 2; - } else { - if ((s = strchr(p, 0)) && s < ctx->headers_end) - s += 2; - else - return NULL; - } - } -} - uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) { int len = strlen(c); char *s = ctx->headers, *p = ctx->outHeaders.start; @@ -1930,100 +1817,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str return uw_unit_v; } -static channel_delta *allocate_delta(uw_context ctx, channel *ch) { - size_t i; - channel_delta *cd; +static delta *allocate_delta(uw_context ctx, unsigned client) { + unsigned i; + delta *d; - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); + for (i = 0; i < ctx->used_deltas; ++i) + if (ctx->deltas[i].client == client) + return &ctx->deltas[i]; - if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) - return &ctx->deltas[i]; - - if (i < ctx->n_deltas) - cd = &ctx->deltas[i]; - else { - ++ctx->n_deltas; - ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); - cd = &ctx->deltas[ctx->n_deltas-1]; - cd->n_subscribed = 0; - cd->subscribed = malloc(0); - buf_init(&cd->msgs, 0); + if (ctx->used_deltas >= ctx->n_deltas) { + ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas); + buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0); } - - cd->mode = USED; - cd->newness = OLD; - cd->ch = ch; - if (cd->n_subscribed > 0) - cd->subscribed[0] = NULL; - buf_reset(&cd->msgs); - return cd; + + d = &ctx->deltas[ctx->used_deltas++]; + d->client = client; + buf_reset(&d->msgs); + return d; } uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { - size_t i; - channel *ch = uw_new_channel(); - ++ch->data.used.refcount; - channel_delta *cd = allocate_delta(ctx, ch); + if (ctx->client == NULL) + uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection"); - cd->newness = NEW; - - return ch->id; -} - -static int delta_used(channel_delta *cd) { - return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); + return new_channel(ctx->client); } -uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { - channel *ch = uw_find_channel(chn); - - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); - int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); - client *c = uw_find_client(id); - - if (c == NULL) { - uw_release_channel(ch); - uw_error(ctx, FATAL, "Unknown client ID in subscription request"); - } else if (c->data.used.pass != pass) { - uw_release_channel(ch); - uw_release_client(c); - uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass); - } else { - size_t i; - channel_delta *cd = allocate_delta(ctx, ch); +uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { + delta *d = allocate_delta(ctx, chn.cli); + size_t len; + int preLen; + char pre[INTS_MAX + 2]; - if (delta_used(cd)) - uw_release_channel(ch); + len = strlen(msg); - for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); + sprintf(pre, "%u\n%n", chn.chn, &preLen); - if (i < cd->n_subscribed) - cd->subscribed[i] = c; - else { - ++cd->n_subscribed; - cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); - cd->subscribed[cd->n_subscribed-1] = c; - } - } - } - - return uw_unit_v; -} - -uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { - channel *ch = uw_find_channel(chn); - - if (ch == NULL) - uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else { - channel_delta *cd = allocate_delta(ctx, ch); - if (delta_used(cd)) - uw_release_channel(ch); - buf_append(&cd->msgs, msg, strlen(msg)); - } + buf_append(&d->msgs, pre, preLen); + buf_append(&d->msgs, msg, len); + buf_append(&d->msgs, "\n", 1); return uw_unit_v; } @@ -2032,46 +1864,27 @@ int uw_db_commit(uw_context); int uw_db_rollback(uw_context); void uw_commit(uw_context ctx) { - size_t i, j; + unsigned i; - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; + if (uw_db_commit(ctx)) + uw_error(ctx, FATAL, "Error running SQL COMMIT"); - uw_subscribe(ch, c); - uw_release_client(c); - } + for (i = 0; i < ctx->used_deltas; ++i) { + delta *d = &ctx->deltas[i]; + client *c = find_client(d->client); - if (buf_used(&ctx->deltas[i].msgs) > 0) { - uw_channel_send(ch, ctx->deltas[i].msgs.start); - } + assert (c != NULL && c->mode == USED); - uw_release_channel(ch); + client_send(c == ctx->client, c, &d->msgs); } - if (uw_db_commit(ctx)) - uw_error(ctx, FATAL, "Error running SQL COMMIT"); + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); } int uw_rollback(uw_context ctx) { - size_t i, j; - - for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { - channel *ch = ctx->deltas[i].ch; - - for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { - client *c = ctx->deltas[i].subscribed[j]; - - uw_release_client(c); - } - - if (ctx->deltas[i].newness == NEW) - uw_free_channel(ch); - else - uw_release_channel(ch); - } + if (ctx->client) + pthread_mutex_unlock(&ctx->client->lock); return uw_db_rollback(ctx); } diff --git a/src/cjr_print.sml b/src/cjr_print.sml index a17a0416..351180b7 100644 --- a/src/cjr_print.sml +++ b/src/cjr_print.sml @@ -404,6 +404,7 @@ fun p_unsql wontLeakStrings env (tAll as (t, loc)) e = | TFfi ("Basis", "bool") => box [string "uw_Basis_stringToBool_error(ctx, ", e, string ")"] | TFfi ("Basis", "time") => box [string "uw_Basis_stringToTime_error(ctx, ", e, string ")"] | TFfi ("Basis", "channel") => box [string "uw_Basis_stringToChannel_error(ctx, ", e, string ")"] + | TFfi ("Basis", "client") => box [string "uw_Basis_stringToClient_error(ctx, ", e, string ")"] | _ => (ErrorMsg.errorAt loc "Don't know how to unmarshal type from SQL"; Print.eprefaces' [("Type", p_typ env tAll)]; @@ -447,6 +448,7 @@ datatype sql_type = | Bool | Time | Channel + | Client | Nullable of sql_type fun p_sql_type' t = @@ -457,6 +459,7 @@ fun p_sql_type' t = | Bool => "uw_Basis_bool" | Time => "uw_Basis_time" | Channel => "uw_Basis_channel" + | Client => "uw_Basis_client" | Nullable String => "uw_Basis_string" | Nullable t => p_sql_type' t ^ "*" @@ -473,6 +476,7 @@ fun getPargs (e, _) = | EFfiApp ("Basis", "sqlifyBool", [e]) => [(e, Bool)] | EFfiApp ("Basis", "sqlifyTime", [e]) => [(e, Time)] | EFfiApp ("Basis", "sqlifyChannel", [e]) => [(e, Channel)] + | EFfiApp ("Basis", "sqlifyClient", [e]) => [(e, Client)] | ECase (e, [((PNone _, _), @@ -496,8 +500,9 @@ fun p_ensql t e = | Float => box [string "uw_Basis_attrifyFloat(ctx, ", e, string ")"] | String => e | Bool => box [string "(", e, string " ? \"TRUE\" : \"FALSE\")"] - | Time => box [string "uw_Basis_sqlifyTime(ctx, ", e, string ")"] + | Time => box [string "uw_Basis_attrifyTime(ctx, ", e, string ")"] | Channel => box [string "uw_Basis_attrifyChannel(ctx, ", e, string ")"] + | Client => box [string "uw_Basis_attrifyClient(ctx, ", e, string ")"] | Nullable String => e | Nullable t => box [string "(", e, @@ -1982,7 +1987,7 @@ fun p_decl env (dAll as (d, _) : decl) = newline, string "PGconn *conn = uw_get_db(ctx);", newline, - string "PGresult *res = PQexec(conn, \"BEGIN\");", + string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", newline, newline, string "if (res == NULL) return 1;", @@ -2108,7 +2113,8 @@ fun p_sqltype'' env (tAll as (t, loc)) = | TFfi ("Basis", "string") => "text" | TFfi ("Basis", "bool") => "bool" | TFfi ("Basis", "time") => "timestamp" - | TFfi ("Basis", "channel") => "int4" + | TFfi ("Basis", "channel") => "int8" + | TFfi ("Basis", "client") => "int4" | _ => (ErrorMsg.errorAt loc "Don't know SQL equivalent of type"; Print.eprefaces' [("Type", p_typ env tAll)]; "ERROR") @@ -2368,6 +2374,8 @@ fun p_file env (ds, ps) = string (!Monoize.urlPrefix), string "\");", newline]), + string "uw_login(ctx);", + newline, box [string "{", newline, box (ListUtil.mapi (fn (i, t) => box [p_typ env t, diff --git a/src/jscomp.sml b/src/jscomp.sml index f5627e24..a487eeab 100644 --- a/src/jscomp.sml +++ b/src/jscomp.sml @@ -49,7 +49,6 @@ val funcs = [(("Basis", "alert"), "alert"), (("Basis", "urlifyInt"), "ts"), (("Basis", "urlifyFloat"), "ts"), (("Basis", "urlifyString"), "uf"), - (("Basis", "urlifyChannel"), "ts"), (("Basis", "recv"), "rv")] structure FM = BinaryMapFn(struct @@ -220,7 +219,7 @@ fun process file = | TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st) | TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st) | TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st) - | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st) + | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "jsifyChannel", [e]), loc), st) | TFfi ("Basis", "bool") => ((ECase (e, [((PCon (Enum, PConFfi {mod = "Basis", @@ -348,7 +347,7 @@ fun process file = | TFfi ("Basis", "string") => ("uu(t[i++])", st) | TFfi ("Basis", "int") => ("parseInt(t[i++])", st) | TFfi ("Basis", "float") => ("parseFloat(t[i++])", st) - | TFfi ("Basis", "channel") => ("parseInt(t[i++])", st) + | TFfi ("Basis", "channel") => ("(t[i++].length > 0 ? parseInt(t[i]) : null)", st) | TFfi ("Basis", "bool") => ("t[i++] == \"True\"", st) diff --git a/src/monoize.sml b/src/monoize.sml index 03ce6311..5701cc0c 100644 --- a/src/monoize.sml +++ b/src/monoize.sml @@ -1110,14 +1110,6 @@ fun monoExp (env, st, fm) (all as (e, loc)) = ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc), (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc), fm) - | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) => - ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), - (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), - (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), - (L'.EFfiApp ("Basis", "subscribe", - [(L'.ERel 1, loc)]), - loc)), loc)), loc), - fm) | L.ECApp ((L.EFfi ("Basis", "send"), _), t) => let val t = monoType env t @@ -1431,6 +1423,10 @@ fun monoExp (env, st, fm) (all as (e, loc)) = ((L'.EAbs ("x", (L'.TFfi ("Basis", "channel"), loc), (L'.TFfi ("Basis", "string"), loc), (L'.EFfiApp ("Basis", "sqlifyChannel", [(L'.ERel 0, loc)]), loc)), loc), fm) + | L.EFfi ("Basis", "sql_client") => + ((L'.EAbs ("x", (L'.TFfi ("Basis", "client"), loc), (L'.TFfi ("Basis", "string"), loc), + (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc)), loc), + fm) | L.ECApp ((L.EFfi ("Basis", "sql_prim"), _), t) => let val t = monoType env t diff --git a/src/prepare.sml b/src/prepare.sml index 1f3f323a..258b9dcf 100644 --- a/src/prepare.sml +++ b/src/prepare.sml @@ -48,6 +48,8 @@ fun prepString (e, ss, n) = | EFfiApp ("Basis", "sqlifyTime", [e]) => SOME ("$" ^ Int.toString (n + 1) ^ "::timestamp" :: ss, n + 1) | EFfiApp ("Basis", "sqlifyChannel", [e]) => + SOME ("$" ^ Int.toString (n + 1) ^ "::int8" :: ss, n + 1) + | EFfiApp ("Basis", "sqlifyClient", [e]) => SOME ("$" ^ Int.toString (n + 1) ^ "::int4" :: ss, n + 1) | ECase (e, |