summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/urweb.h6
-rw-r--r--lib/js/urweb.js37
-rw-r--r--src/c/driver.c31
-rw-r--r--src/c/urweb.c308
-rw-r--r--src/cjr_print.sml3
-rw-r--r--src/monoize.sml3
6 files changed, 334 insertions, 54 deletions
diff --git a/include/urweb.h b/include/urweb.h
index 676cc84b..ca0aada2 100644
--- a/include/urweb.h
+++ b/include/urweb.h
@@ -6,6 +6,11 @@ int uw_really_send(int sock, void *buf, ssize_t len);
extern uw_unit uw_unit_v;
+void uw_global_init(void);
+
+void uw_client_connect(size_t id, int pass, int sock);
+void uw_prune_clients(time_t timeout);
+
uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len);
void uw_set_db(uw_context, void*);
void *uw_get_db(uw_context);
@@ -41,6 +46,7 @@ uw_unit uw_Basis_set_client_source(uw_context, uw_Basis_int, uw_Basis_string);
void uw_set_script_header(uw_context, const char*);
const char *uw_Basis_get_script(uw_context, uw_unit);
+const char *uw_Basis_get_listener(uw_context, uw_unit);
char *uw_Basis_htmlifyInt(uw_context, uw_Basis_int);
char *uw_Basis_htmlifyFloat(uw_context, uw_Basis_float);
diff --git a/lib/js/urweb.js b/lib/js/urweb.js
index 08d96040..9f93120f 100644
--- a/lib/js/urweb.js
+++ b/lib/js/urweb.js
@@ -174,3 +174,40 @@ function rc(uri, parse, k) {
xhr.open("GET", uri, true);
xhr.send(null);
}
+
+
+var client_id = 0;
+var client_pass = 0;
+var url_prefix = "/";
+
+function path_join(s1, s2) {
+ if (s1.length > 0 && s1[s1.length-1] == '/')
+ return s1 + s2;
+ else
+ return s1 + "/" + s2;
+}
+
+function listener() {
+ var xhr = getXHR();
+
+ xhr.onreadystatechange = function() {
+ if (xhr.readyState == 4) {
+ var isok = false;
+
+ try {
+ if (xhr.status == 200)
+ isok = true;
+ } catch (e) { }
+
+ if (isok)
+ alert("Messages: " + xhr.responseText);
+ else {
+ alert("Error querying remote server for messages!");
+ throw "Error querying remote server for messages!";
+ }
+ }
+ };
+
+ xhr.open("GET", path_join(url_prefix, ".msgs/" + client_id + "/" + client_pass), true);
+ xhr.send(null);
+}
diff --git a/src/c/driver.c b/src/c/driver.c
index 34e57a6d..12905ede 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -106,7 +106,7 @@ static void *worker(void *data) {
while (1) {
char buf[uw_bufsize+1], *back = buf, *s;
- int sock;
+ int sock, dont_close = 0;
pthread_mutex_lock(&queue_mutex);
while (empty())
@@ -138,6 +138,7 @@ static void *worker(void *data) {
if (s = strstr(buf, "\r\n\r\n")) {
failure_kind fk;
char *cmd, *path, *headers, path_copy[uw_bufsize+1], *inputs;
+ int id, pass;
s[2] = 0;
@@ -168,6 +169,12 @@ static void *worker(void *data) {
break;
}
+ if (sscanf(path, "/.msgs/%d/%d", &id, &pass) == 2) {
+ uw_client_connect(id, pass, sock);
+ dont_close = 1;
+ break;
+ }
+
if (inputs = strchr(path, '?')) {
char *name, *value;
*inputs++ = 0;
@@ -286,11 +293,19 @@ static void *worker(void *data) {
}
}
- close(sock);
+ if (!dont_close)
+ close(sock);
uw_reset(ctx);
}
}
+static void *client_pruner(void *data) {
+ while (1) {
+ uw_prune_clients(5);
+ sleep(5);
+ }
+}
+
static void help(char *cmd) {
printf("Usage: %s [-p <port>] [-t <thread-count>]\n", cmd);
}
@@ -377,8 +392,20 @@ int main(int argc, char *argv[]) {
sin_size = sizeof their_addr;
+ uw_global_init();
+
printf("Listening on port %d....\n", uw_port);
+ {
+ pthread_t thread;
+ int name;
+
+ if (pthread_create(&thread, NULL, client_pruner, &name)) {
+ fprintf(stderr, "Error creating pruner thread\n");
+ return 1;
+ }
+ }
+
for (i = 0; i < nthreads; ++i) {
pthread_t thread;
names[i] = i;
diff --git a/src/c/urweb.c b/src/c/urweb.c
index dd11a2a9..1f4c6fdd 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -8,10 +8,234 @@
#include <setjmp.h>
#include <stdarg.h>
+#include <pthread.h>
+
#include "types.h"
uw_unit uw_unit_v = {};
+
+// Socket extras
+
+int uw_really_send(int sock, const void *buf, size_t len) {
+ while (len > 0) {
+ size_t n = send(sock, buf, len, 0);
+
+ if (n < 0)
+ return n;
+
+ buf += n;
+ len -= n;
+ }
+
+ return 0;
+}
+
+
+// Buffers
+
+typedef struct {
+ char *start, *front, *back;
+} buf;
+
+static void buf_init(buf *b, size_t s) {
+ b->front = b->start = malloc(s);
+ b->back = b->front + s;
+}
+
+static void buf_free(buf *b) {
+ free(b->start);
+}
+
+static void buf_reset(buf *b) {
+ b->front = b->start;
+}
+
+static void buf_check(buf *b, size_t extra) {
+ if (b->back - b->front < extra) {
+ size_t desired = b->front - b->start + extra, next;
+ char *new_heap;
+
+ next = b->back - b->start;
+ if (next == 0)
+ next = 1;
+ for (; next < desired; next *= 2);
+
+ new_heap = realloc(b->start, next);
+ b->front = new_heap + (b->front - b->start);
+ b->back = new_heap + next;
+ b->start = new_heap;
+ }
+}
+
+static size_t buf_used(buf *b) {
+ return b->front - b->start;
+}
+
+static size_t buf_avail(buf *b) {
+ return b->back - b->start;
+}
+
+
+// Cross-request state
+
+typedef enum { UNUSED, USED } usage;
+
+typedef struct client {
+ size_t id;
+ usage mode;
+ union {
+ struct client *next;
+ struct {
+ pthread_mutex_t lock;
+ int pass;
+ buf msgs;
+ int sock;
+ time_t last_contact;
+ } used;
+ } data;
+} client;
+
+static client **clients, *clients_free;
+static size_t n_clients;
+
+static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+void uw_global_init() {
+ srand(time(NULL) ^ getpid());
+
+ clients = malloc(0);
+}
+
+void uw_global_free() {
+ size_t i;
+
+ for (i = 0; i < n_clients; ++i)
+ free(clients[i]);
+
+ free(clients);
+}
+
+static client *uw_new_client() {
+ client *c;
+
+ pthread_mutex_lock(&clients_mutex);
+
+ if (clients_free) {
+ c = clients_free;
+ clients_free = clients_free->data.next;
+ }
+ else {
+ ++n_clients;
+ clients = realloc(clients, sizeof(client) * n_clients);
+ c = malloc(sizeof(client));
+ c->id = n_clients-1;
+ clients[n_clients-1] = c;
+ }
+
+ c->mode = USED;
+ pthread_mutex_init(&c->data.used.lock, NULL);
+ c->data.used.pass = rand();
+ c->data.used.sock = -1;
+ c->data.used.last_contact = time(NULL);
+ buf_init(&c->data.used.msgs, 0);
+
+ pthread_mutex_unlock(&clients_mutex);
+
+ return c;
+}
+
+static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
+
+void uw_client_connect(size_t id, int pass, int sock) {
+ client *c;
+
+ pthread_mutex_lock(&clients_mutex);
+
+ if (id >= n_clients) {
+ pthread_mutex_unlock(&clients_mutex);
+ close(sock);
+ fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+ return;
+ }
+
+ c = clients[id];
+
+ if (c->mode != USED) {
+ pthread_mutex_unlock(&clients_mutex);
+ close(sock);
+ fprintf(stderr, "Client request for unused ID (%d)\n", (int)id);
+ return;
+ }
+
+ pthread_mutex_lock(&c->data.used.lock);
+
+ if (pass != c->data.used.pass) {
+ pthread_mutex_unlock(&c->data.used.lock);
+ pthread_mutex_unlock(&clients_mutex);
+ close(sock);
+ fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+ return;
+ }
+
+ if (c->data.used.sock != -1) {
+ pthread_mutex_unlock(&c->data.used.lock);
+ pthread_mutex_unlock(&clients_mutex);
+ close(sock);
+ fprintf(stderr, "Duplicate client connection (%d)\n", (int)id);
+ return;
+ }
+
+ c->data.used.last_contact = time(NULL);
+
+ if (buf_used(&c->data.used.msgs) > 0) {
+ uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
+ close(sock);
+ }
+ else {
+ uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(sock, "Hi!", 3);
+ close(sock);
+ //c->data.used.sock = sock;
+ }
+
+ pthread_mutex_unlock(&c->data.used.lock);
+ pthread_mutex_unlock(&clients_mutex);
+}
+
+static void uw_free_client(client *c) {
+ printf("Freeing client %d\n", c->id);
+
+ if (c->mode == USED && c->data.used.sock != -1)
+ close(c->data.used.sock);
+
+ pthread_mutex_destroy(&c->data.used.lock);
+ buf_free(&c->data.used.msgs);
+ c->mode = UNUSED;
+ c->data.next = clients_free;
+ clients_free = c;
+}
+
+void uw_prune_clients(time_t timeout) {
+ size_t i;
+ time_t cutoff;
+
+ cutoff = time(NULL) - timeout;
+
+ pthread_mutex_lock(&clients_mutex);
+
+ for (i = 0; i < n_clients; ++i) {
+ if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff)
+ uw_free_client(clients[i]);
+ }
+
+ pthread_mutex_unlock(&clients_mutex);
+}
+
+
+// Single-request state
+
#define ERROR_BUF_LEN 1024
typedef struct regions {
@@ -23,10 +247,6 @@ typedef struct {
void *arg;
} cleanup;
-typedef struct {
- char *start, *front, *back;
-} buf;
-
struct uw_context {
char *headers, *headers_end;
@@ -43,18 +263,13 @@ struct uw_context {
cleanup *cleanup, *cleanup_front, *cleanup_back;
- const char *script_header;
+ const char *script_header, *url_prefix;
char error_message[ERROR_BUF_LEN];
};
extern int uw_inputs_len;
-static void buf_init(buf *b, size_t s) {
- b->front = b->start = malloc(s);
- b->back = b->front + s;
-}
-
uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) {
uw_context ctx = malloc(sizeof(struct uw_context));
@@ -64,6 +279,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si
buf_init(&ctx->page, page_len);
buf_init(&ctx->heap, heap_len);
buf_init(&ctx->script, script_len);
+ ctx->script.start[0] = 0;
ctx->inputs = calloc(uw_inputs_len, sizeof(char *));
@@ -74,6 +290,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si
ctx->cleanup_front = ctx->cleanup_back = ctx->cleanup = malloc(0);
ctx->script_header = "";
+ ctx->url_prefix = "/";
ctx->error_message[0] = 0;
@@ -90,10 +307,6 @@ void *uw_get_db(uw_context ctx) {
return ctx->db;
}
-static void buf_free(buf *b) {
- free(b->front);
-}
-
void uw_free(uw_context ctx) {
buf_free(&ctx->outHeaders);
buf_free(&ctx->script);
@@ -104,13 +317,10 @@ void uw_free(uw_context ctx) {
free(ctx);
}
-static void buf_reset(buf *b) {
- b->front = b->start;
-}
-
void uw_reset_keep_error_message(uw_context ctx) {
buf_reset(&ctx->outHeaders);
buf_reset(&ctx->script);
+ ctx->script.start[0] = 0;
buf_reset(&ctx->page);
buf_reset(&ctx->heap);
ctx->regions = NULL;
@@ -249,7 +459,12 @@ void uw_set_script_header(uw_context ctx, const char *s) {
ctx->script_header = s;
}
-static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) {
+void uw_set_url_prefix(uw_context ctx, const char *s) {
+ ctx->url_prefix = s;
+}
+
+
+static void buf_check_ctx(uw_context ctx, buf *b, size_t extra, const char *desc) {
if (b->back - b->front < extra) {
size_t desired = b->front - b->start + extra, next;
char *new_heap;
@@ -263,7 +478,7 @@ static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) {
b->front = new_heap + (b->front - b->start);
b->back = new_heap + next;
- if (desc && new_heap != b->start) {
+ if (new_heap != b->start) {
b->start = new_heap;
uw_error(ctx, UNLIMITED_RETRY, "Couldn't allocate new %s contiguously", desc);
}
@@ -273,7 +488,7 @@ static void buf_check(uw_context ctx, buf *b, size_t extra, const char *desc) {
}
static void uw_check_heap(uw_context ctx, size_t extra) {
- buf_check(ctx, &ctx->heap, extra, "heap chunk");
+ buf_check_ctx(ctx, &ctx->heap, extra, "heap chunk");
}
void *uw_malloc(uw_context ctx, size_t len) {
@@ -307,14 +522,6 @@ void uw_end_region(uw_context ctx) {
ctx->regions = r->next;
}
-static size_t buf_used(buf *b) {
- return b->front - b->start;
-}
-
-static size_t buf_avail(buf *b) {
- return b->back - b->start;
-}
-
void uw_memstats(uw_context ctx) {
printf("Headers: %d/%d\n", buf_used(&ctx->outHeaders), buf_avail(&ctx->outHeaders));
printf("Script: %d/%d\n", buf_used(&ctx->script), buf_avail(&ctx->script));
@@ -322,20 +529,6 @@ void uw_memstats(uw_context ctx) {
printf("Heap: %d/%d\n", buf_used(&ctx->heap), buf_avail(&ctx->heap));
}
-int uw_really_send(int sock, const void *buf, size_t len) {
- while (len > 0) {
- size_t n = send(sock, buf, len, 0);
-
- if (n < 0)
- return n;
-
- buf += n;
- len -= n;
- }
-
- return 0;
-}
-
int uw_send(uw_context ctx, int sock) {
int n = uw_really_send(sock, ctx->outHeaders.start, ctx->outHeaders.front - ctx->outHeaders.start);
@@ -351,7 +544,7 @@ int uw_send(uw_context ctx, int sock) {
}
static void uw_check_headers(uw_context ctx, size_t extra) {
- buf_check(ctx, &ctx->outHeaders, extra, NULL);
+ buf_check(&ctx->outHeaders, extra);
}
void uw_write_header(uw_context ctx, uw_Basis_string s) {
@@ -363,7 +556,7 @@ void uw_write_header(uw_context ctx, uw_Basis_string s) {
}
static void uw_check_script(uw_context ctx, size_t extra) {
- buf_check(ctx, &ctx->script, extra, NULL);
+ buf_check(&ctx->script, extra);
}
void uw_write_script(uw_context ctx, uw_Basis_string s) {
@@ -375,16 +568,27 @@ void uw_write_script(uw_context ctx, uw_Basis_string s) {
}
const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
- if (ctx->script.front == ctx->script.start) {
- return ctx->script_header;
- } else {
- char *r = uw_malloc(ctx, 41 + (ctx->script.front - ctx->script.start) + strlen(ctx->script_header));
+ if (ctx->script_header[0] == 0)
+ return "";
+ else {
+ int pass;
+ client *c = uw_new_client(&pass);
- sprintf(r, "%s<script>%s</script>", ctx->script_header, ctx->script.start);
+ char *r = uw_malloc(ctx, strlen(ctx->script_header) + 56 + 2 * INTS_MAX + buf_used(&ctx->script)
+ + strlen(ctx->url_prefix));
+ sprintf(r, "%s<script>client_id=%d;client_pass=%d;url_prefix=\"%s\";%s</script>",
+ ctx->script_header, (int)c->id, c->data.used.pass, ctx->url_prefix, ctx->script.start);
return r;
}
}
+const char *uw_Basis_get_listener(uw_context ctx, uw_unit u) {
+ if (ctx->script_header[0] == 0)
+ return "";
+ else
+ return " onload='listener()'";
+}
+
uw_Basis_string uw_Basis_jsifyString(uw_context ctx, uw_Basis_string s) {
char *r, *s2;
@@ -486,7 +690,7 @@ uw_unit uw_Basis_set_client_source(uw_context ctx, uw_Basis_int n, uw_Basis_stri
}
static void uw_check(uw_context ctx, size_t extra) {
- buf_check(ctx, &ctx->page, extra, NULL);
+ buf_check(&ctx->page, extra);
}
static void uw_writec_unsafe(uw_context ctx, char c) {
@@ -1370,3 +1574,5 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
+
+
diff --git a/src/cjr_print.sml b/src/cjr_print.sml
index 4b6a56db..c5931616 100644
--- a/src/cjr_print.sml
+++ b/src/cjr_print.sml
@@ -2355,6 +2355,9 @@ fun p_file env (ds, ps) =
^ "\\\"></script>\\n"
| ServerOnly => ""),
string "\");",
+ string "uw_set_url_prefix(ctx, \"",
+ string (!Monoize.urlPrefix),
+ string "\");",
newline]),
box [string "{",
newline,
diff --git a/src/monoize.sml b/src/monoize.sml
index 01f18baf..88abf0c2 100644
--- a/src/monoize.sml
+++ b/src/monoize.sml
@@ -1924,7 +1924,8 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
end
in
case tag of
- "body" => normal ("body", NONE,
+ "body" => normal ("body",
+ SOME (L'.EFfiApp ("Basis", "get_listener", [(L'.ERecord [], loc)]), loc),
SOME (L'.EFfiApp ("Basis", "get_script", [(L'.ERecord [], loc)]), loc))
| "dyn" =>