summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adamc@hcoop.net>2009-03-22 15:05:07 -0400
committerGravatar Adam Chlipala <adamc@hcoop.net>2009-03-22 15:05:07 -0400
commit728965a2d50c6a1e3461309016ece1921492ff37 (patch)
tree5a3c064007e7481a452bc60d9dfaeabebb515a53 /src
parentaa8373e15c46685b6d6a951342205fd28dc52f1a (diff)
First message send delivered, but not interpreted
Diffstat (limited to 'src')
-rw-r--r--src/c/driver.c18
-rw-r--r--src/c/urweb.c245
-rw-r--r--src/jscomp.sml4
-rw-r--r--src/mono_reduce.sml16
-rw-r--r--src/monoize.sml57
-rw-r--r--src/rpcify.sml5
-rw-r--r--src/scriptcheck.sml5
7 files changed, 302 insertions, 48 deletions
diff --git a/src/c/driver.c b/src/c/driver.c
index 12905ede..fe8d3aa5 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -138,7 +138,6 @@ static void *worker(void *data) {
if (s = strstr(buf, "\r\n\r\n")) {
failure_kind fk;
char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs;
- int id, pass;
s[2] = 0;
@@ -169,9 +168,18 @@ static void *worker(void *data) {
break;
}
- if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) {
- uw_client_connect(id, pass, sock);
- dont_close = 1;
+ uw_set_headers(ctx, headers);
+
+ if (!strcmp(path, "/.msgs")) {
+ char *id = uw_Basis_requestHeader(ctx, "UrWeb-Client");
+ char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass");
+
+ if (id && pass) {
+ size_t idn = atoi(id);
+ uw_client_connect(idn, atoi(pass), sock);
+ dont_close = 1;
+ fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn);
+ }
break;
}
@@ -197,8 +205,6 @@ static void *worker(void *data) {
printf("Serving URI %s....\n", path);
- uw_set_headers(ctx, headers);
-
while (1) {
if (uw_db_begin(ctx)) {
printf("Error running SQL BEGIN\n");
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 1f4c6fdd..abe08221 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -76,8 +76,14 @@ static size_t buf_avail(buf *b) {
return b->back - b->start;
}
+static void buf_append(buf *b, const char *s, size_t len) {
+ buf_check(b, len);
+ memcpy(b->front, s, len);
+ b->front += len;
+}
+
-// Cross-request state
+// Persistent client state
typedef enum { UNUSED, USED } usage;
@@ -101,21 +107,6 @@ static size_t n_clients;
static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
-void uw_global_init() {
- srand(time(NULL) ^ getpid());
-
- clients = malloc(0);
-}
-
-void uw_global_free() {
- size_t i;
-
- for (i = 0; i < n_clients; ++i)
- free(clients[i]);
-
- free(clients);
-}
-
static client *uw_new_client() {
client *c;
@@ -147,24 +138,33 @@ static client *uw_new_client() {
static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
-void uw_client_connect(size_t id, int pass, int sock) {
+static client *uw_find_client(size_t id) {
client *c;
pthread_mutex_lock(&clients_mutex);
if (id >= n_clients) {
pthread_mutex_unlock(&clients_mutex);
- close(sock);
- fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
- return;
+ return NULL;
}
c = clients[id];
if (c->mode != USED) {
pthread_mutex_unlock(&clients_mutex);
+ return NULL;
+ }
+
+ pthread_mutex_unlock(&clients_mutex);
+ return c;
+}
+
+void uw_client_connect(size_t id, int pass, int sock) {
+ client *c = uw_find_client(id);
+
+ if (c == NULL) {
close(sock);
- fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+ fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
return;
}
@@ -172,7 +172,6 @@ void uw_client_connect(size_t id, int pass, int sock) {
if (pass != c->data.used.pass) {
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
close(sock);
fprintf(stderr, "Wrong client password (%d)\n", (int)id);
return;
@@ -180,7 +179,6 @@ void uw_client_connect(size_t id, int pass, int sock) {
if (c->data.used.sock != -1) {
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
close(sock);
fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
return;
@@ -193,15 +191,10 @@ void uw_client_connect(size_t id, int pass, int sock) {
uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
close(sock);
}
- else {
- uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(sock, "Hi!", 3);
- close(sock);
- //c->data.used.sock = sock;
- }
+ else
+ c->data.used.sock = sock;
pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_unlock(&clients_mutex);
}
static void uw_free_client(client *c) {
@@ -234,6 +227,150 @@ void uw_prune_clients(time_t timeout) {
}
+// Persistent channel state
+
+typedef struct client_list {
+ client *data;
+ struct client_list *next;
+} client_list;
+
+typedef struct channel {
+ size_t id;
+ usage mode;
+ union {
+ struct channel *next;
+ struct {
+ pthread_mutex_t lock;
+ client_list *clients;
+ } used;
+ } data;
+} channel;
+
+static channel **channels, *channels_free;
+static size_t n_channels;
+
+static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static channel *uw_new_channel() {
+ channel *ch;
+
+ pthread_mutex_lock(&channels_mutex);
+
+ if (channels_free) {
+ ch = channels_free;
+ channels_free = channels_free->data.next;
+ }
+ else {
+ ++n_channels;
+ channels = realloc(channels, sizeof(channels) * n_channels);
+ ch = malloc(sizeof(channel));
+ ch->id = n_channels-1;
+ channels[n_channels-1] = ch;
+ }
+
+ ch->mode = USED;
+ pthread_mutex_init(&ch->data.used.lock, NULL);
+ ch->data.used.clients = NULL;
+
+ pthread_mutex_unlock(&channels_mutex);
+
+ return ch;
+}
+
+static channel *uw_find_channel(size_t id) {
+ channel *ch = NULL;
+
+ pthread_mutex_lock(&channels_mutex);
+
+ if (id < n_channels && channels[id]->mode == USED)
+ ch = channels[id];
+
+ pthread_mutex_unlock(&channels_mutex);
+
+ return ch;
+}
+
+static void uw_subscribe(channel *ch, client *c) {
+ client_list *cs = malloc(sizeof(client_list));
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ cs->data = c;
+ cs->next = ch->data.used.clients;
+ ch->data.used.clients = cs;
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_unsubscribe(channel *ch, client *c) {
+ client_list *prev, *cur, *tmp;
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ for (prev = NULL, cur = ch->data.used.clients; cur; ) {
+ if (cur->data == c) {
+ if (prev)
+ prev->next = cur->next;
+ else
+ ch->data.used.clients = cur->next;
+ tmp = cur;
+ cur = cur->next;
+ free(tmp);
+ }
+ else {
+ prev = cur;
+ cur = cur->next;
+ }
+ }
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+static void uw_channel_send(channel *ch, const char *msg) {
+ size_t len = strlen(msg), preLen;
+ char pre[INTS_MAX + 2];
+ client_list *cs;
+
+ sprintf(pre, "%d\n", (int)ch->id);
+ preLen = strlen(pre);
+
+ pthread_mutex_lock(&ch->data.used.lock);
+
+ for (cs = ch->data.used.clients; cs; cs = cs->next) {
+ client *c = cs->data;
+
+ pthread_mutex_lock(&c->data.used.lock);
+
+ if (c->data.used.sock != -1) {
+ uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(c->data.used.sock, pre, preLen);
+ uw_really_send(c->data.used.sock, msg, len);
+ uw_really_send(c->data.used.sock, "\n", 1);
+ close(c->data.used.sock);
+ c->data.used.sock = -1;
+ } else {
+ buf_append(&c->data.used.msgs, pre, preLen);
+ buf_append(&c->data.used.msgs, msg, len);
+ buf_append(&c->data.used.msgs, "\n", 1);
+ }
+
+ pthread_mutex_unlock(&c->data.used.lock);
+ }
+
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
+
+// Global entry points
+
+void uw_global_init() {
+ srand(time(NULL) ^ getpid());
+
+ clients = malloc(0);
+ channels = malloc(0);
+}
+
+
// Single-request state
#define ERROR_BUF_LEN 1024
@@ -582,11 +719,17 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
}
}
-const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+const char *uw_Basis_get_listener(uw_context ctx, uw_Basis_string onload) {
if (ctx->script_header[0] == 0)
return "";
- else
+ else if (onload[0] == 0)
return " onload='listener()'";
+ else {
+ uw_Basis_string s = uw_malloc(ctx, strlen(onload) + 22);
+
+ sprintf(s, " onload='listener();%s'", onload);
+ return s;
+ }
}
uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
@@ -941,6 +1084,10 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) {
return r;
}
+uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
+ return uw_Basis_unurlifyInt(ctx, s);
+}
+
uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
char *new_s = uw_unurlify_advance(*s);
uw_Basis_float r;
@@ -1032,6 +1179,10 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
+char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
+ return uw_Basis_htmlifyInt(ctx, ch);
+}
+
char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
int len;
char *r;
@@ -1575,4 +1726,34 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
+uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
+ return uw_new_channel()->id;
+}
+
+uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
+ channel *ch = uw_find_channel(chn);
+ if (ch == NULL)
+ uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+ else {
+ size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
+ int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
+ client *c = uw_find_client(id);
+
+ if (c == NULL)
+ uw_error(ctx, FATAL, "Unknown client ID in subscription request");
+ else if (c->data.used.pass != pass)
+ uw_error(ctx, FATAL, "Wrong client password in subscription request");
+ else
+ uw_subscribe(ch, c);
+ }
+}
+
+uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
+ channel *ch = uw_find_channel(chn);
+
+ if (ch == NULL)
+ uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
+ else
+ uw_channel_send(ch, msg);
+}
diff --git a/src/jscomp.sml b/src/jscomp.sml
index 00048458..be227035 100644
--- a/src/jscomp.sml
+++ b/src/jscomp.sml
@@ -48,7 +48,8 @@ val funcs = [(("Basis", "alert"), "alert"),
(("Basis", "stringToInt_error"), "pi"),
(("Basis", "urlifyInt"), "ts"),
(("Basis", "urlifyFloat"), "ts"),
- (("Basis", "urlifyString"), "escape")]
+ (("Basis", "urlifyString"), "escape"),
+ (("Basis", "urlifyChannel"), "ts")]
structure FM = BinaryMapFn(struct
type ord_key = string * string
@@ -216,6 +217,7 @@ fun process file =
| TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st)
| TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st)
| TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st)
+ | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st)
| TFfi ("Basis", "bool") => ((ECase (e,
[((PCon (Enum, PConFfi {mod = "Basis",
diff --git a/src/mono_reduce.sml b/src/mono_reduce.sml
index 1f640004..b789e05f 100644
--- a/src/mono_reduce.sml
+++ b/src/mono_reduce.sml
@@ -57,6 +57,9 @@ fun impure (e, _) =
| EFfiApp ("Basis", "new_client_source", _) => true
| EFfiApp ("Basis", "set_client_source", _) => true
| EFfiApp ("Basis", "alert", _) => true
+ | EFfiApp ("Basis", "new_channel", _) => true
+ | EFfiApp ("Basis", "subscribe", _) => true
+ | EFfiApp ("Basis", "send", _) => true
| EFfiApp _ => false
| EApp ((EFfi _, _), _) => false
| EApp _ => true
@@ -256,6 +259,8 @@ fun reduce file =
fun summarize d (e, _) =
let
+ fun ffi es = List.concat (map (summarize d) es) @ [Unsure]
+
val s =
case e of
EPrim _ => []
@@ -266,10 +271,13 @@ fun reduce file =
| ENone _ => []
| ESome (_, e) => summarize d e
| EFfi _ => []
- | EFfiApp ("Basis", "set_cookie", es) => List.concat (map (summarize d) es) @ [Unsure]
- | EFfiApp ("Basis", "new_client_source", es) => List.concat (map (summarize d) es) @ [Unsure]
- | EFfiApp ("Basis", "set_client_source", es) => List.concat (map (summarize d) es) @ [Unsure]
- | EFfiApp ("Basis", "alert", es) => List.concat (map (summarize d) es) @ [Unsure]
+ | EFfiApp ("Basis", "set_cookie", es) => ffi es
+ | EFfiApp ("Basis", "new_client_source", es) => ffi es
+ | EFfiApp ("Basis", "set_client_source", es) => ffi es
+ | EFfiApp ("Basis", "alert", es) => ffi es
+ | EFfiApp ("Basis", "new_channel", es) => ffi es
+ | EFfiApp ("Basis", "subscribe", es) => ffi es
+ | EFfiApp ("Basis", "send", es) => ffi es
| EFfiApp (_, _, es) => List.concat (map (summarize d) es)
| EApp ((EFfi _, _), e) => summarize d e
| EApp _ =>
diff --git a/src/monoize.sml b/src/monoize.sml
index 88abf0c2..d6b5ae15 100644
--- a/src/monoize.sml
+++ b/src/monoize.sml
@@ -180,6 +180,9 @@ fun monoType env =
| L.CApp ((L.CFfi ("Basis", "sql_nfunc"), _), _) =>
(L'.TFfi ("Basis", "string"), loc)
+ | L.CApp ((L.CFfi ("Basis", "channel"), _), _) =>
+ (L'.TFfi ("Basis", "channel"), loc)
+
| L.CRel _ => poly ()
| L.CNamed n =>
(case IM.find (dtmap, n) of
@@ -1081,6 +1084,34 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
fm)
end
+ | L.ECApp ((L.EFfi ("Basis", "channel"), _), t) =>
+ ((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc),
+ (L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc),
+ fm)
+ | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) =>
+ ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc),
+ (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc),
+ (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc),
+ (L'.EFfiApp ("Basis", "subscribe",
+ [(L'.ERel 1, loc)]),
+ loc)), loc)), loc),
+ fm)
+ | L.ECApp ((L.EFfi ("Basis", "send"), _), t) =>
+ let
+ val t = monoType env t
+ val (e, fm) = urlifyExp env fm ((L'.ERel 1, loc), t)
+ in
+ ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc),
+ (L'.TFun (t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc)), loc),
+ (L'.EAbs ("v", t, (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc),
+ (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc),
+ (L'.EFfiApp ("Basis", "send",
+ [(L'.ERel 2, loc),
+ e]),
+ loc)), loc)), loc)), loc),
+ fm)
+ end
+
| L.EFfiApp ("Basis", "dml", [e]) =>
let
val (e, fm) = monoExp (env, st, fm) e
@@ -1781,6 +1812,14 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
L'.ERecord xes => xes
| _ => raise Fail "Non-record attributes!"
+ fun findOnload (attrs, acc) =
+ case attrs of
+ [] => (NONE, acc)
+ | ("Onload", e, _) :: rest => (SOME e, List.revAppend (acc, rest))
+ | x :: rest => findOnload (rest, x :: acc)
+
+ val (onload, attrs) = findOnload (attrs, [])
+
fun lowercaseFirst "" = ""
| lowercaseFirst s = String.str (Char.toLower (String.sub (s, 0)))
^ String.extract (s, 1, NONE)
@@ -1924,9 +1963,21 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
end
in
case tag of
- "body" => normal ("body",
- SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc),
- SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc))
+ "body" =>
+ let
+ val onload = case onload of
+ NONE => (L'.EPrim (Prim.String ""), loc)
+ | SOME e =>
+ let
+ val e = (L'.EApp (e, (L'.ERecord [], loc)), loc)
+ in
+ (L'.EJavaScript (L'.Attribute, e, NONE), loc)
+ end
+ in
+ normal ("body",
+ SOME (L'.EFfiApp ("Basis", "get_listener", [onload]), loc),
+ SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc))
+ end
| "dyn" =>
(case attrs of
diff --git a/src/rpcify.sml b/src/rpcify.sml
index f4db3444..1212b81e 100644
--- a/src/rpcify.sml
+++ b/src/rpcify.sml
@@ -50,7 +50,10 @@ val ssBasis = SS.addList (SS.empty,
["requestHeader",
"query",
"dml",
- "nextval"])
+ "nextval",
+ "new_channel",
+ "subscribe",
+ "send"])
val csBasis = SS.addList (SS.empty,
["source",
diff --git a/src/scriptcheck.sml b/src/scriptcheck.sml
index fd4f4cd9..2bc185f9 100644
--- a/src/scriptcheck.sml
+++ b/src/scriptcheck.sml
@@ -38,7 +38,10 @@ structure IS = IntBinarySet
val csBasis = SS.addList (SS.empty,
["new_client_source",
"get_client_source",
- "set_client_source"])
+ "set_client_source",
+ "new_channel",
+ "subscribe",
+ "recv"])
val scriptWords = ["<script",
" onclick="]