diff options
author | Adam Chlipala <adamc@hcoop.net> | 2009-03-19 16:34:13 -0400 |
---|---|---|
committer | Adam Chlipala <adamc@hcoop.net> | 2009-03-19 16:34:13 -0400 |
commit | 9e730b9a1aa4db311088a355e7e8601c6a998467 (patch) | |
tree | 286b56b8fe24a04c3c2a26f13d6fdc9c845d7452 | |
parent | 1f530c51813cbeee1b166e5e75058eacb313f306 (diff) |
Dummy message delivery to clients
-rw-r--r-- | include/urweb.h | 6 | ||||
-rw-r--r-- | lib/js/urweb.js | 37 | ||||
-rw-r--r-- | src/c/driver.c | 31 | ||||
-rw-r--r-- | src/c/urweb.c | 308 | ||||
-rw-r--r-- | src/cjr_print.sml | 3 | ||||
-rw-r--r-- | src/monoize.sml | 3 |
6 files changed, 334 insertions, 54 deletions
diff --git a/include/urweb.h b/include/urweb.h index 676cc84b..ca0aada2 100644 --- a/include/urweb.h +++ b/include/urweb.h @@ -6,6 +6,11 @@ int uw_really_send(int sock, void *buf, ssize_t len); extern uw_unit uw_unit_v; +void uw_global_init(void); + +void uw_client_connect(size_t id, int pass, int sock); +void uw_prune_clients(time_t timeout); + uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len); void uw_set_db(uw_context, void*); void *uw_get_db(uw_context); @@ -41,6 +46,7 @@ uw_unit uw_Basis_set_client_source(uw_context, uw_Basis_int, uw_Basis_string); void uw_set_script_header(uw_context, const char*); const char *uw_Basis_get_script(uw_context, uw_unit); +const char *uw_Basis_get_listener(uw_context, uw_unit); char *uw_Basis_htmlifyInt(uw_context, uw_Basis_int); char *uw_Basis_htmlifyFloat(uw_context, uw_Basis_float); diff --git a/lib/js/urweb.js b/lib/js/urweb.js index 08d96040..9f93120f 100644 --- a/lib/js/urweb.js +++ b/lib/js/urweb.js @@ -174,3 +174,40 @@ function rc(uri, parse, k) { xhr.open("GET", uri, true); xhr.send(null); } + + +var client_id = 0; +var client_pass = 0; +var url_prefix = "/"; + +function path_join(s1, s2) { + if (s1.length > 0 && s1[s1.length-1] == '/') + return s1 + s2; + else + return s1 + "/" + s2; +} + +function listener() { + var xhr = getXHR(); + + xhr.onreadystatechange = function() { + if (xhr.readyState == 4) { + var isok = false; + + try { + if (xhr.status == 200) + isok = true; + } catch (e) { } + + if (isok) + alert("Messages: " + xhr.responseText); + else { + alert("Error querying remote server for messages!"); + throw "Error querying remote server for messages!"; + } + } + }; + + xhr.open("GET", path_join(url_prefix, ".msgs/" + client_id + "/" + client_pass), true); + xhr.send(null); +} diff --git a/src/c/driver.c b/src/c/driver.c index 34e57a6d..12905ede 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -106,7 +106,7 @@ static void *worker(void *data) { while (1) { char buf[uw_bufsize+1], *back = buf, *s; - int sock; + int sock, dont_close = 0; pthread_mutex_lock(&queue_mutex); while (empty()) @@ -138,6 +138,7 @@ static void *worker(void *data) { if (s = strstr(buf, "\r\n\r\n")) { failure_kind fk; char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs; + int id, pass; s[2] = 0; @@ -168,6 +169,12 @@ static void *worker(void *data) { break; } + if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) { + uw_client_connect(id, pass, sock); + dont_close = 1; + break; + } + if (inputs = strchr(path, '?')) { char *name, *value; *inputs++ = 0; @@ -286,11 +293,19 @@ static void *worker(void *data) { } } - close(sock); + if (!dont_close) + close(sock); uw_reset(ctx); } } +static void *client_pruner(void *data) { + while (1) { + uw_prune_clients(5); + sleep(5); + } +} + static void help(char *cmd) { printf("Usage: %s [-p <port>] [-t <thread-count>]\n", cmd); } @@ -377,8 +392,20 @@ int main(int argc, char *argv[]) { sin_size = sizeof their_addr; + uw_global_init(); + printf("Listening on port %d....\n", uw_port); + { + pthread_t thread; + int name; + + if (pthread_create(&thread, NULL, client_pruner, &name)) { + fprintf(stderr, "Error creating pruner thread\n"); + return 1; + } + } + for (i = 0; i < nthreads; ++i) { pthread_t thread; names[i] = i; diff --git a/src/c/urweb.c b/src/c/urweb.c index dd11a2a9..1f4c6fdd 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -8,10 +8,234 @@ #include <setjmp.h> #include <stdarg.h> +#include <pthread.h> + #include "types.h" uw_unit uw_unit_v = {}; + +// Socket extras + +int uw_really_send(int sock, const void *buf, size_t len) { + while (len > 0) { + size_t n = send(sock, buf, len, 0); + + if (n < 0) + return n; + + buf += n; + len -= n; + } + + return 0; +} + + +// Buffers + +typedef struct { + char *start, *front, *back; +} buf; + +static void buf_init(buf *b, size_t s) { + b->front = b->start = malloc(s); + b->back = b->front + s; +} + +static void buf_free(buf *b) { + free(b->start); +} + +static void buf_reset(buf *b) { + b->front = b->start; +} + +static void buf_check(buf *b, size_t extra) { + if (b->back - b->front < extra) { + size_t desired = b->front - b->start + extra, next; + char *new_heap; + + next = b->back - b->start; + if (next == 0) + next = 1; + for (; next < desired; next *= 2); + + new_heap = realloc(b->start, next); + b->front = new_heap + (b->front - b->start); + b->back = new_heap + next; + b->start = new_heap; + } +} + +static size_t buf_used(buf *b) { + return b->front - b->start; +} + +static size_t buf_avail(buf *b) { + return b->back - b->start; +} + + +// Cross-request state + +typedef enum { UNUSED, USED } usage; + +typedef struct client { + size_t id; + usage mode; + union { + struct client *next; + struct { + pthread_mutex_t lock; + int pass; + buf msgs; + int sock; + time_t last_contact; + } used; + } data; +} client; + +static client **clients, *clients_free; +static size_t n_clients; + +static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; + +void uw_global_init() { + srand(time(NULL) ^ getpid()); + + clients = malloc(0); +} + +void uw_global_free() { + size_t i; + + for (i = 0; i < n_clients; ++i) + free(clients[i]); + + free(clients); +} + +static client *uw_new_client() { + client *c; + + pthread_mutex_lock(&clients_mutex); + + if (clients_free) { + c = clients_free; + clients_free = clients_free->data.next; + } + else { + ++n_clients; + clients = realloc(clients, sizeof(client) * n_clients); + c = malloc(sizeof(client)); + c->id = n_clients-1; + clients[n_clients-1] = c; + } + + c->mode = USED; + pthread_mutex_init(&c->data.used.lock, NULL); + c->data.used.pass = rand(); + c->data.used.sock = -1; + c->data.used.last_contact = time(NULL); + buf_init(&c->data.used.msgs, 0); + + pthread_mutex_unlock(&clients_mutex); + + return c; +} + +static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; + +void uw_client_connect(size_t id, int pass, int sock) { + client *c; + + pthread_mutex_lock(&clients_mutex); + + if (id >= n_clients) { + pthread_mutex_unlock(&clients_mutex); + close(sock); + fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); + return; + } + + c = clients[id]; + + if (c->mode != USED) { + pthread_mutex_unlock(&clients_mutex); + close(sock); + fprintf(stderr, "Client request for unused ID (%d)\n", (int)id); + return; + } + + pthread_mutex_lock(&c->data.used.lock); + + if (pass != c->data.used.pass) { + pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&clients_mutex); + close(sock); + fprintf(stderr, "Wrong client password (%d)\n", (int)id); + return; + } + + if (c->data.used.sock != -1) { + pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&clients_mutex); + close(sock); + fprintf(stderr, "Duplicate client connection (%d)\n", (int)id); + return; + } + + c->data.used.last_contact = time(NULL); + + if (buf_used(&c->data.used.msgs) > 0) { + uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); + close(sock); + } + else { + uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(sock, "Hi!", 3); + close(sock); + //c->data.used.sock = sock; + } + + pthread_mutex_unlock(&c->data.used.lock); + pthread_mutex_unlock(&clients_mutex); +} + +static void uw_free_client(client *c) { + printf("Freeing client %d\n", c->id); + + if (c->mode == USED && c->data.used.sock != -1) + close(c->data.used.sock); + + pthread_mutex_destroy(&c->data.used.lock); + buf_free(&c->data.used.msgs); + c->mode = UNUSED; + c->data.next = clients_free; + clients_free = c; +} + +void uw_prune_clients(time_t timeout) { + size_t i; + time_t cutoff; + + cutoff = time(NULL) - timeout; + + pthread_mutex_lock(&clients_mutex); + + for (i = 0; i < n_clients; ++i) { + if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff) + uw_free_client(clients[i]); + } + + pthread_mutex_unlock(&clients_mutex); +} + + +// Single-request state + #define ERROR_BUF_LEN 1024 typedef struct regions { @@ -23,10 +247,6 @@ typedef struct { void *arg; } cleanup; -typedef struct { - char *start, *front, *back; -} buf; - struct uw_context { char *headers, *headers_end; @@ -43,18 +263,13 @@ struct uw_context { cleanup *cleanup, *cleanup_front, *cleanup_back; - const char *script_header; + const char *script_header, *url_prefix; char error_message[ERROR_BUF_LEN]; }; extern int uw_inputs_len; -static void buf_init(buf *b, size_t s) { - b->front = b->start = malloc(s); - b->back = b->front + s; -} - uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) { uw_context ctx = malloc(sizeof(struct uw_context)); @@ -64,6 +279,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si buf_init(&ctx->page, page_len); buf_init(&ctx->heap, heap_len); buf_init(&ctx->script, script_len); + ctx->script.start[0] = 0; ctx->inputs = calloc(uw_inputs_len, sizeof(char *)); @@ -74,6 +290,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si ctx->cleanup_front = ctx->cleanup_back = ctx->cleanup = malloc(0); ctx->script_header = ""; + ctx->url_prefix = "/"; ctx->error_message[0] = 0; @@ -90,10 +307,6 @@ void *uw_get_db(uw_context ctx) { return ctx->db; } -static void buf_free(buf *b) { - free(b->front); -} - void uw_free(uw_context ctx) { buf_free(&ctx->outHeaders); buf_free(&ctx->script); @@ -104,13 +317,10 @@ void uw_free(uw_context ctx) { free(ctx); } -static void buf_reset(buf *b) { - b->front = b->start; -} - void uw_reset_keep_error_message(uw_context ctx) { buf_reset(&ctx->outHeaders); buf_reset(&ctx->script); + ctx->script.start[0] = 0; buf_reset(&ctx->page); buf_reset(&ctx->heap); ctx->regions = NULL; @@ -249,7 +459,12 @@ void uw_set_script_header(uw_context ctx, const char *s) { ctx->script_header = s; } -static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) { +void uw_set_url_prefix(uw_context ctx, const char *s) { + ctx->url_prefix = s; +} + + +static void buf_check_ctx(uw_context ctx, buf *b, size_t extra, const char *desc) { if (b->back - b->front < extra) { size_t desired = b->front - b->start + extra, next; char *new_heap; @@ -263,7 +478,7 @@ static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) { b->front = new_heap + (b->front - b->start); b->back = new_heap + next; - if (desc && new_heap != b->start) { + if (new_heap != b->start) { b->start = new_heap; uw_error(ctx, UNLIMITED_RETRY, "Couldn't allocate new %s contiguously", desc); } @@ -273,7 +488,7 @@ static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) { } static void uw_check_heap(uw_context ctx, size_t extra) { - buf_check(ctx, &ctx->heap, extra, "heap chunk"); + buf_check_ctx(ctx, &ctx->heap, extra, "heap chunk"); } void *uw_malloc(uw_context ctx, size_t len) { @@ -307,14 +522,6 @@ void uw_end_region(uw_context ctx) { ctx->regions = r->next; } -static size_t buf_used(buf *b) { - return b->front - b->start; -} - -static size_t buf_avail(buf *b) { - return b->back - b->start; -} - void uw_memstats(uw_context ctx) { printf("Headers: %d/%d\n", buf_used(&ctx->outHeaders), buf_avail(&ctx->outHeaders)); printf("Script: %d/%d\n", buf_used(&ctx->script), buf_avail(&ctx->script)); @@ -322,20 +529,6 @@ void uw_memstats(uw_context ctx) { printf("Heap: %d/%d\n", buf_used(&ctx->heap), buf_avail(&ctx->heap)); } -int uw_really_send(int sock, const void *buf, size_t len) { - while (len > 0) { - size_t n = send(sock, buf, len, 0); - - if (n < 0) - return n; - - buf += n; - len -= n; - } - - return 0; -} - int uw_send(uw_context ctx, int sock) { int n = uw_really_send(sock, ctx->outHeaders.start, ctx->outHeaders.front - ctx->outHeaders.start); @@ -351,7 +544,7 @@ int uw_send(uw_context ctx, int sock) { } static void uw_check_headers(uw_context ctx, size_t extra) { - buf_check(ctx, &ctx->outHeaders, extra, NULL); + buf_check(&ctx->outHeaders, extra); } void uw_write_header(uw_context ctx, uw_Basis_string s) { @@ -363,7 +556,7 @@ void uw_write_header(uw_context ctx, uw_Basis_string s) { } static void uw_check_script(uw_context ctx, size_t extra) { - buf_check(ctx, &ctx->script, extra, NULL); + buf_check(&ctx->script, extra); } void uw_write_script(uw_context ctx, uw_Basis_string s) { @@ -375,16 +568,27 @@ void uw_write_script(uw_context ctx, uw_Basis_string s) { } const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { - if (ctx->script.front == ctx->script.start) { - return ctx->script_header; - } else { - char *r = uw_malloc(ctx, 41 + (ctx->script.front - ctx->script.start) + strlen(ctx->script_header)); + if (ctx->script_header[0] == 0) + return ""; + else { + int pass; + client *c = uw_new_client(&pass); - sprintf(r, "%s<script>%s</script>", ctx->script_header, ctx->script.start); + char *r = uw_malloc(ctx, strlen(ctx->script_header) + 56 + 2 * INTS_MAX + buf_used(&ctx->script) + + strlen(ctx->url_prefix)); + sprintf(r, "%s<script>client_id=%d;client_pass=%d;url_prefix=\"%s\";%s</script>", + ctx->script_header, (int)c->id, c->data.used.pass, ctx->url_prefix, ctx->script.start); return r; } } +const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) { + if (ctx->script_header[0] == 0) + return ""; + else + return " onload='listener()'"; +} + uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) { char *r, *s2; @@ -486,7 +690,7 @@ uw_unit uw_Basis_set_client_source(uw_context ctx, uw_Basis_int n, uw_Basis_stri } static void uw_check(uw_context ctx, size_t extra) { - buf_check(ctx, &ctx->page, extra, NULL); + buf_check(&ctx->page, extra); } static void uw_writec_unsafe(uw_context ctx, char c) { @@ -1370,3 +1574,5 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str return uw_unit_v; } + + diff --git a/src/cjr_print.sml b/src/cjr_print.sml index 4b6a56db..c5931616 100644 --- a/src/cjr_print.sml +++ b/src/cjr_print.sml @@ -2355,6 +2355,9 @@ fun p_file env (ds, ps) = ^ "\\\"></script>\\n" | ServerOnly => ""), string "\");", + string "uw_set_url_prefix(ctx, \"", + string (!Monoize.urlPrefix), + string "\");", newline]), box [string "{", newline, diff --git a/src/monoize.sml b/src/monoize.sml index 01f18baf..88abf0c2 100644 --- a/src/monoize.sml +++ b/src/monoize.sml @@ -1924,7 +1924,8 @@ fun monoExp (env, st, fm) (all as (e, loc)) = end in case tag of - "body" => normal ("body", NONE, + "body" => normal ("body", + SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc), SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc)) | "dyn" => |