From 2627ab6f5cfd96df06016a88b7e5a245e3ab9d8f Mon Sep 17 00:00:00 2001 From: Adam Chlipala Date: Sun, 22 Mar 2009 15:05:07 -0400 Subject: First message send delivered, but not interpreted --- src/c/driver.c | 18 ++-- src/c/urweb.c | 245 +++++++++++++++++++++++++++++++++++++++++++++------- src/jscomp.sml | 4 +- src/mono_reduce.sml | 16 +++- src/monoize.sml | 57 +++++++++++- src/rpcify.sml | 5 +- src/scriptcheck.sml | 5 +- 7 files changed, 302 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/src/c/driver.c b/src/c/driver.c index 12905ede..fe8d3aa5 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -138,7 +138,6 @@ static void *worker(void *data) { if (s = strstr(buf, "\r\n\r\n")) { failure_kind fk; char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs; - int id, pass; s[2] = 0; @@ -169,9 +168,18 @@ static void *worker(void *data) { break; } - if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) { - uw_client_connect(id, pass, sock); - dont_close = 1; + uw_set_headers(ctx, headers); + + if (!strcmp(path, "/.msgs")) { + char *id = uw_Basis_requestHeader(ctx, "UrWeb-Client"); + char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass"); + + if (id && pass) { + size_t idn = atoi(id); + uw_client_connect(idn, atoi(pass), sock); + dont_close = 1; + fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn); + } break; } @@ -197,8 +205,6 @@ static void *worker(void *data) { printf("Serving URI %s....\n", path); - uw_set_headers(ctx, headers); - while (1) { if (uw_db_begin(ctx)) { printf("Error running SQL BEGIN\n"); diff --git a/src/c/urweb.c b/src/c/urweb.c index 1f4c6fdd..abe08221 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -76,8 +76,14 @@ static size_t buf_avail(buf *b) { return b->back - b->start; } +static void buf_append(buf *b, const char *s, size_t len) { + buf_check(b, len); + memcpy(b->front, s, len); + b->front += len; +} + -// Cross-request state +// Persistent client state typedef enum { UNUSED, USED } usage; @@ -101,21 +107,6 @@ static size_t n_clients; static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; -void uw_global_init() { - srand(time(NULL) ^ getpid()); - - clients = malloc(0); -} - -void uw_global_free() { - size_t i; - - for (i = 0; i < n_clients; ++i) - free(clients[i]); - - free(clients); -} - static client *uw_new_client() { client *c; @@ -147,24 +138,33 @@ static client *uw_new_client() { static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n"; -void uw_client_connect(size_t id, int pass, int sock) { +static client *uw_find_client(size_t id) { client *c; pthread_mutex_lock(&clients_mutex); if (id >= n_clients) { pthread_mutex_unlock(&clients_mutex); - close(sock); - fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); - return; + return NULL; } c = clients[id]; if (c->mode != USED) { pthread_mutex_unlock(&clients_mutex); + return NULL; + } + + pthread_mutex_unlock(&clients_mutex); + return c; +} + +void uw_client_connect(size_t id, int pass, int sock) { + client *c = uw_find_client(id); + + if (c == NULL) { close(sock); - fprintf(stderr, "Client request for unused ID (%d)\n", (int)id); + fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id); return; } @@ -172,7 +172,6 @@ void uw_client_connect(size_t id, int pass, int sock) { if (pass != c->data.used.pass) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Wrong client password (%d)\n", (int)id); return; @@ -180,7 +179,6 @@ void uw_client_connect(size_t id, int pass, int sock) { if (c->data.used.sock != -1) { pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); close(sock); fprintf(stderr, "Duplicate client connection (%d)\n", (int)id); return; @@ -193,15 +191,10 @@ void uw_client_connect(size_t id, int pass, int sock) { uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs)); close(sock); } - else { - uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1); - uw_really_send(sock, "Hi!", 3); - close(sock); - //c->data.used.sock = sock; - } + else + c->data.used.sock = sock; pthread_mutex_unlock(&c->data.used.lock); - pthread_mutex_unlock(&clients_mutex); } static void uw_free_client(client *c) { @@ -234,6 +227,150 @@ void uw_prune_clients(time_t timeout) { } +// Persistent channel state + +typedef struct client_list { + client *data; + struct client_list *next; +} client_list; + +typedef struct channel { + size_t id; + usage mode; + union { + struct channel *next; + struct { + pthread_mutex_t lock; + client_list *clients; + } used; + } data; +} channel; + +static channel **channels, *channels_free; +static size_t n_channels; + +static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER; + +static channel *uw_new_channel() { + channel *ch; + + pthread_mutex_lock(&channels_mutex); + + if (channels_free) { + ch = channels_free; + channels_free = channels_free->data.next; + } + else { + ++n_channels; + channels = realloc(channels, sizeof(channels) * n_channels); + ch = malloc(sizeof(channel)); + ch->id = n_channels-1; + channels[n_channels-1] = ch; + } + + ch->mode = USED; + pthread_mutex_init(&ch->data.used.lock, NULL); + ch->data.used.clients = NULL; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static channel *uw_find_channel(size_t id) { + channel *ch = NULL; + + pthread_mutex_lock(&channels_mutex); + + if (id < n_channels && channels[id]->mode == USED) + ch = channels[id]; + + pthread_mutex_unlock(&channels_mutex); + + return ch; +} + +static void uw_subscribe(channel *ch, client *c) { + client_list *cs = malloc(sizeof(client_list)); + + pthread_mutex_lock(&ch->data.used.lock); + + cs->data = c; + cs->next = ch->data.used.clients; + ch->data.used.clients = cs; + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_unsubscribe(channel *ch, client *c) { + client_list *prev, *cur, *tmp; + + pthread_mutex_lock(&ch->data.used.lock); + + for (prev = NULL, cur = ch->data.used.clients; cur; ) { + if (cur->data == c) { + if (prev) + prev->next = cur->next; + else + ch->data.used.clients = cur->next; + tmp = cur; + cur = cur->next; + free(tmp); + } + else { + prev = cur; + cur = cur->next; + } + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + +static void uw_channel_send(channel *ch, const char *msg) { + size_t len = strlen(msg), preLen; + char pre[INTS_MAX + 2]; + client_list *cs; + + sprintf(pre, "%d\n", (int)ch->id); + preLen = strlen(pre); + + pthread_mutex_lock(&ch->data.used.lock); + + for (cs = ch->data.used.clients; cs; cs = cs->next) { + client *c = cs->data; + + pthread_mutex_lock(&c->data.used.lock); + + if (c->data.used.sock != -1) { + uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); + uw_really_send(c->data.used.sock, pre, preLen); + uw_really_send(c->data.used.sock, msg, len); + uw_really_send(c->data.used.sock, "\n", 1); + close(c->data.used.sock); + c->data.used.sock = -1; + } else { + buf_append(&c->data.used.msgs, pre, preLen); + buf_append(&c->data.used.msgs, msg, len); + buf_append(&c->data.used.msgs, "\n", 1); + } + + pthread_mutex_unlock(&c->data.used.lock); + } + + pthread_mutex_unlock(&ch->data.used.lock); +} + + +// Global entry points + +void uw_global_init() { + srand(time(NULL) ^ getpid()); + + clients = malloc(0); + channels = malloc(0); +} + + // Single-request state #define ERROR_BUF_LEN 1024 @@ -582,11 +719,17 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) { } } -const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) { +const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) { if (ctx->script_header[0] == 0) return ""; - else + else if (onload[0] == 0) return " onload='listener()'"; + else { + uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22); + + sprintf(s, " onload='listener();%s'", onload); + return s; + } } uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) { @@ -941,6 +1084,10 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) { return r; } +uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) { + return uw_Basis_unurlifyInt(ctx, s); +} + uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) { char *new_s = uw_unurlify_advance(*s); uw_Basis_float r; @@ -1032,6 +1179,10 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) { return uw_unit_v; } +char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) { + return uw_Basis_htmlifyInt(ctx, ch); +} + char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) { int len; char *r; @@ -1575,4 +1726,34 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str return uw_unit_v; } +uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { + return uw_new_channel()->id; +} + +uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { + channel *ch = uw_find_channel(chn); + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else { + size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client")); + int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); + client *c = uw_find_client(id); + + if (c == NULL) + uw_error(ctx, FATAL, "Unknown client ID in subscription request"); + else if (c->data.used.pass != pass) + uw_error(ctx, FATAL, "Wrong client password in subscription request"); + else + uw_subscribe(ch, c); + } +} + +uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { + channel *ch = uw_find_channel(chn); + + if (ch == NULL) + uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); + else + uw_channel_send(ch, msg); +} diff --git a/src/jscomp.sml b/src/jscomp.sml index 00048458..be227035 100644 --- a/src/jscomp.sml +++ b/src/jscomp.sml @@ -48,7 +48,8 @@ val funcs = [(("Basis", "alert"), "alert"), (("Basis", "stringToInt_error"), "pi"), (("Basis", "urlifyInt"), "ts"), (("Basis", "urlifyFloat"), "ts"), - (("Basis", "urlifyString"), "escape")] + (("Basis", "urlifyString"), "escape"), + (("Basis", "urlifyChannel"), "ts")] structure FM = BinaryMapFn(struct type ord_key = string * string @@ -216,6 +217,7 @@ fun process file = | TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st) | TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st) | TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st) + | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st) | TFfi ("Basis", "bool") => ((ECase (e, [((PCon (Enum, PConFfi {mod = "Basis", diff --git a/src/mono_reduce.sml b/src/mono_reduce.sml index 1f640004..b789e05f 100644 --- a/src/mono_reduce.sml +++ b/src/mono_reduce.sml @@ -57,6 +57,9 @@ fun impure (e, _) = | EFfiApp ("Basis", "new_client_source", _) => true | EFfiApp ("Basis", "set_client_source", _) => true | EFfiApp ("Basis", "alert", _) => true + | EFfiApp ("Basis", "new_channel", _) => true + | EFfiApp ("Basis", "subscribe", _) => true + | EFfiApp ("Basis", "send", _) => true | EFfiApp _ => false | EApp ((EFfi _, _), _) => false | EApp _ => true @@ -256,6 +259,8 @@ fun reduce file = fun summarize d (e, _) = let + fun ffi es = List.concat (map (summarize d) es) @ [Unsure] + val s = case e of EPrim _ => [] @@ -266,10 +271,13 @@ fun reduce file = | ENone _ => [] | ESome (_, e) => summarize d e | EFfi _ => [] - | EFfiApp ("Basis", "set_cookie", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "new_client_source", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "set_client_source", es) => List.concat (map (summarize d) es) @ [Unsure] - | EFfiApp ("Basis", "alert", es) => List.concat (map (summarize d) es) @ [Unsure] + | EFfiApp ("Basis", "set_cookie", es) => ffi es + | EFfiApp ("Basis", "new_client_source", es) => ffi es + | EFfiApp ("Basis", "set_client_source", es) => ffi es + | EFfiApp ("Basis", "alert", es) => ffi es + | EFfiApp ("Basis", "new_channel", es) => ffi es + | EFfiApp ("Basis", "subscribe", es) => ffi es + | EFfiApp ("Basis", "send", es) => ffi es | EFfiApp (_, _, es) => List.concat (map (summarize d) es) | EApp ((EFfi _, _), e) => summarize d e | EApp _ => diff --git a/src/monoize.sml b/src/monoize.sml index 88abf0c2..d6b5ae15 100644 --- a/src/monoize.sml +++ b/src/monoize.sml @@ -180,6 +180,9 @@ fun monoType env = | L.CApp ((L.CFfi ("Basis", "sql_nfunc"), _), _) => (L'.TFfi ("Basis", "string"), loc) + | L.CApp ((L.CFfi ("Basis", "channel"), _), _) => + (L'.TFfi ("Basis", "channel"), loc) + | L.CRel _ => poly () | L.CNamed n => (case IM.find (dtmap, n) of @@ -1081,6 +1084,34 @@ fun monoExp (env, st, fm) (all as (e, loc)) = fm) end + | L.ECApp ((L.EFfi ("Basis", "channel"), _), t) => + ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc), + (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc), + fm) + | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) => + ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), + (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), + (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), + (L'.EFfiApp ("Basis", "subscribe", + [(L'.ERel 1, loc)]), + loc)), loc)), loc), + fm) + | L.ECApp ((L.EFfi ("Basis", "send"), _), t) => + let + val t = monoType env t + val (e, fm) = urlifyExp env fm ((L'.ERel 1, loc), t) + in + ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc), + (L'.TFun (t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc)), loc), + (L'.EAbs ("v", t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc), + (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc), + (L'.EFfiApp ("Basis", "send", + [(L'.ERel 2, loc), + e]), + loc)), loc)), loc)), loc), + fm) + end + | L.EFfiApp ("Basis", "dml", [e]) => let val (e, fm) = monoExp (env, st, fm) e @@ -1781,6 +1812,14 @@ fun monoExp (env, st, fm) (all as (e, loc)) = L'.ERecord xes => xes | _ => raise Fail "Non-record attributes!" + fun findOnload (attrs, acc) = + case attrs of + [] => (NONE, acc) + | ("Onload", e, _) :: rest => (SOME e, List.revAppend (acc, rest)) + | x :: rest => findOnload (rest, x :: acc) + + val (onload, attrs) = findOnload (attrs, []) + fun lowercaseFirst "" = "" | lowercaseFirst s = String.str (Char.toLower (String.sub (s, 0))) ^ String.extract (s, 1, NONE) @@ -1924,9 +1963,21 @@ fun monoExp (env, st, fm) (all as (e, loc)) = end in case tag of - "body" => normal ("body", - SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc), - SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc)) + "body" => + let + val onload = case onload of + NONE => (L'.EPrim (Prim.String ""), loc) + | SOME e => + let + val e = (L'.EApp (e, (L'.ERecord [], loc)), loc) + in + (L'.EJavaScript (L'.Attribute, e, NONE), loc) + end + in + normal ("body", + SOME (L'.EFfiApp ("Basis", "get_listener", [onload]), loc), + SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc)) + end | "dyn" => (case attrs of diff --git a/src/rpcify.sml b/src/rpcify.sml index f4db3444..1212b81e 100644 --- a/src/rpcify.sml +++ b/src/rpcify.sml @@ -50,7 +50,10 @@ val ssBasis = SS.addList (SS.empty, ["requestHeader", "query", "dml", - "nextval"]) + "nextval", + "new_channel", + "subscribe", + "send"]) val csBasis = SS.addList (SS.empty, ["source", diff --git a/src/scriptcheck.sml b/src/scriptcheck.sml index fd4f4cd9..2bc185f9 100644 --- a/src/scriptcheck.sml +++ b/src/scriptcheck.sml @@ -38,7 +38,10 @@ structure IS = IntBinarySet val csBasis = SS.addList (SS.empty, ["new_client_source", "get_client_source", - "set_client_source"]) + "set_client_source", + "new_channel", + "subscribe", + "recv"]) val scriptWords = ["