summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adamc@hcoop.net>2009-03-29 11:37:29 -0400
committerGravatar Adam Chlipala <adamc@hcoop.net>2009-03-29 11:37:29 -0400
commit843fcc973f4cf7b149d4f57732fb66f812115320 (patch)
treebae33dd5ebd8393e6dd1b30f7d1a2b75241c9956
parent9f3c3a0215d3f23c8e51fa4824d21dfeaa08ede0 (diff)
Redo channels, making them single-client
-rw-r--r--include/types.h6
-rw-r--r--include/urweb.h14
-rw-r--r--lib/js/urweb.js3
-rw-r--r--lib/ur/basis.urs4
-rw-r--r--lib/ur/top.ur44
-rw-r--r--lib/ur/top.urs18
-rw-r--r--src/c/driver.c20
-rw-r--r--src/c/urweb.c789
-rw-r--r--src/cjr_print.sml14
-rw-r--r--src/jscomp.sml5
-rw-r--r--src/monoize.sml12
-rw-r--r--src/prepare.sml2
-rw-r--r--tests/chat.ur36
13 files changed, 428 insertions, 539 deletions
diff --git a/include/types.h b/include/types.h
index d1ed2fd2..ddbff76b 100644
--- a/include/types.h
+++ b/include/types.h
@@ -17,7 +17,11 @@ typedef struct uw_context *uw_context;
typedef uw_Basis_string uw_Basis_xhtml;
typedef uw_Basis_string uw_Basis_page;
-typedef size_t uw_Basis_channel;
+
+typedef unsigned uw_Basis_client;
+typedef struct {
+ unsigned cli, chn;
+} uw_Basis_channel;
typedef enum { SUCCESS, FATAL, BOUNDED_RETRY, UNLIMITED_RETRY } failure_kind;
diff --git a/include/urweb.h b/include/urweb.h
index 02a4da5b..a12952a4 100644
--- a/include/urweb.h
+++ b/include/urweb.h
@@ -8,7 +8,7 @@ extern uw_unit uw_unit_v;
void uw_global_init(void);
-void uw_client_connect(size_t id, int pass, int sock);
+void uw_client_connect(unsigned id, int pass, int sock);
void uw_prune_clients(time_t timeout);
uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len);
@@ -22,6 +22,7 @@ void uw_reset_keep_error_message(uw_context);
failure_kind uw_begin_init(uw_context);
void uw_set_headers(uw_context, char *headers);
failure_kind uw_begin(uw_context, char *path);
+void uw_login(uw_context);
void uw_commit(uw_context);
int uw_rollback(uw_context);
@@ -55,7 +56,6 @@ char *uw_Basis_htmlifyFloat(uw_context, uw_Basis_float);
char *uw_Basis_htmlifyString(uw_context, uw_Basis_string);
char *uw_Basis_htmlifyBool(uw_context, uw_Basis_bool);
char *uw_Basis_htmlifyTime(uw_context, uw_Basis_time);
-char *uw_Basis_htmlifyChannel(uw_context, uw_Basis_channel);
uw_unit uw_Basis_htmlifyInt_w(uw_context, uw_Basis_int);
uw_unit uw_Basis_htmlifyFloat_w(uw_context, uw_Basis_float);
@@ -66,7 +66,9 @@ uw_unit uw_Basis_htmlifyTime_w(uw_context, uw_Basis_time);
char *uw_Basis_attrifyInt(uw_context, uw_Basis_int);
char *uw_Basis_attrifyFloat(uw_context, uw_Basis_float);
char *uw_Basis_attrifyString(uw_context, uw_Basis_string);
+char *uw_Basis_attrifyTime(uw_context, uw_Basis_time);
char *uw_Basis_attrifyChannel(uw_context, uw_Basis_channel);
+char *uw_Basis_attrifyClient(uw_context, uw_Basis_client);
uw_unit uw_Basis_attrifyInt_w(uw_context, uw_Basis_int);
uw_unit uw_Basis_attrifyFloat_w(uw_context, uw_Basis_float);
@@ -90,7 +92,6 @@ uw_Basis_float uw_Basis_unurlifyFloat(uw_context, char **);
uw_Basis_string uw_Basis_unurlifyString(uw_context, char **);
uw_Basis_bool uw_Basis_unurlifyBool(uw_context, char **);
uw_Basis_time uw_Basis_unurlifyTime(uw_context, char **);
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context, char **);
uw_Basis_string uw_Basis_strcat(uw_context, const char *, const char *);
uw_Basis_string uw_Basis_strdup(uw_context, const char *);
@@ -102,6 +103,7 @@ uw_Basis_string uw_Basis_sqlifyString(uw_context, uw_Basis_string);
uw_Basis_string uw_Basis_sqlifyBool(uw_context, uw_Basis_bool);
uw_Basis_string uw_Basis_sqlifyTime(uw_context, uw_Basis_time);
uw_Basis_string uw_Basis_sqlifyChannel(uw_context, uw_Basis_channel);
+uw_Basis_string uw_Basis_sqlifyClient(uw_context, uw_Basis_client);
uw_Basis_string uw_Basis_sqlifyIntN(uw_context, uw_Basis_int*);
uw_Basis_string uw_Basis_sqlifyFloatN(uw_context, uw_Basis_float*);
@@ -112,6 +114,7 @@ uw_Basis_string uw_Basis_sqlifyTimeN(uw_context, uw_Basis_time*);
char *uw_Basis_ensqlBool(uw_Basis_bool);
char *uw_Basis_jsifyString(uw_context, uw_Basis_string);
+char *uw_Basis_jsifyChannel(uw_context, uw_Basis_channel);
uw_Basis_string uw_Basis_intToString(uw_context, uw_Basis_int);
uw_Basis_string uw_Basis_floatToString(uw_context, uw_Basis_float);
@@ -122,13 +125,13 @@ uw_Basis_int *uw_Basis_stringToInt(uw_context, uw_Basis_string);
uw_Basis_float *uw_Basis_stringToFloat(uw_context, uw_Basis_string);
uw_Basis_bool *uw_Basis_stringToBool(uw_context, uw_Basis_string);
uw_Basis_time *uw_Basis_stringToTime(uw_context, uw_Basis_string);
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context, uw_Basis_string);
uw_Basis_int uw_Basis_stringToInt_error(uw_context, uw_Basis_string);
uw_Basis_float uw_Basis_stringToFloat_error(uw_context, uw_Basis_string);
uw_Basis_bool uw_Basis_stringToBool_error(uw_context, uw_Basis_string);
uw_Basis_time uw_Basis_stringToTime_error(uw_context, uw_Basis_string);
uw_Basis_channel uw_Basis_stringToChannel_error(uw_context, uw_Basis_string);
+uw_Basis_client uw_Basis_stringToClient_error(uw_context, uw_Basis_string);
uw_Basis_string uw_Basis_requestHeader(uw_context, uw_Basis_string);
@@ -138,5 +141,6 @@ uw_Basis_string uw_Basis_get_cookie(uw_context, uw_Basis_string c);
uw_unit uw_Basis_set_cookie(uw_context, uw_Basis_string prefix, uw_Basis_string c, uw_Basis_string v);
uw_Basis_channel uw_Basis_new_channel(uw_context, uw_unit);
-uw_unit uw_Basis_subscribe(uw_context, uw_Basis_channel);
uw_unit uw_Basis_send(uw_context, uw_Basis_channel, uw_Basis_string);
+
+uw_Basis_client uw_Basis_self(uw_context, uw_unit);
diff --git a/lib/js/urweb.js b/lib/js/urweb.js
index c7cecbbf..4daee66d 100644
--- a/lib/js/urweb.js
+++ b/lib/js/urweb.js
@@ -301,6 +301,9 @@ function listener() {
}
function rv(chn, parse, k) {
+ if (chn == null)
+ return;
+
if (chn < 0)
whine("Out-of-bounds channel receive");
diff --git a/lib/ur/basis.urs b/lib/ur/basis.urs
index 23c3fe57..d6b27852 100644
--- a/lib/ur/basis.urs
+++ b/lib/ur/basis.urs
@@ -115,6 +115,9 @@ val subscribe : t ::: Type -> channel t -> transaction unit
val send : t ::: Type -> channel t -> t -> transaction unit
val recv : t ::: Type -> channel t -> transaction t
+type client
+val self : transaction client
+
(** SQL *)
@@ -207,6 +210,7 @@ val sql_time : sql_injectable_prim time
class sql_injectable_nullable
val sql_channel : t ::: Type -> sql_injectable_nullable (channel t)
+val sql_client : sql_injectable_nullable client
class sql_injectable
val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t
diff --git a/lib/ur/top.ur b/lib/ur/top.ur
index 713f4311..5465da3d 100644
--- a/lib/ur/top.ur
+++ b/lib/ur/top.ur
@@ -143,6 +143,14 @@ fun foldRX2 K (tf1 :: K -> Type) (tf2 :: K -> Type) (ctx :: {Unit})
<xml>{f [nm] [t] [rest] ! r1 r2}{acc}</xml>)
<xml/>
+fun queryI (tables ::: {{Type}}) (exps ::: {Type})
+ [tables ~ exps] (q : sql_query tables exps)
+ (f : $(exps ++ map (fn fields :: {Type} => $fields) tables)
+ -> transaction unit) =
+ query q
+ (fn fs _ => f fs)
+ ()
+
fun queryX (tables ::: {{Type}}) (exps ::: {Type}) (ctx ::: {Unit})
[tables ~ exps] (q : sql_query tables exps)
(f : $(exps ++ map (fn fields :: {Type} => $fields) tables)
@@ -188,3 +196,39 @@ fun eqNullable' (tables ::: {{Type}}) (agg ::: {{Type}}) (exps ::: {Type})
case e2 of
None => (SQL {e1} IS NULL)
| Some _ => sql_binary sql_eq e1 (sql_inject e2)
+
+
+functor Broadcast(M : sig type t end) = struct
+ sequence s
+ table t : {Id : int, Client : option client, Channel : option (channel M.t)}
+
+ type topic = int
+
+ val inj : sql_injectable topic = _
+
+ val create = nextval s
+
+ val cleanup =
+ dml (DELETE FROM t WHERE Client IS NULL)
+
+ fun subscribe id =
+ cli <- self;
+ cleanup;
+ ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]});
+ case ro of
+ None =>
+ ch <- channel;
+ dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]}));
+ return ch
+ | Some r =>
+ case r.T.Channel of
+ None => error <xml>Broadcast.subscribe: Got null result</xml>
+ | Some ch => return ch
+
+ fun send id msg =
+ cleanup;
+ queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]})
+ (fn r => case r.T.Channel of
+ None => error <xml>Broadcast.send: Got null result</xml>
+ | Some ch => Basis.send ch msg)
+end
diff --git a/lib/ur/top.urs b/lib/ur/top.urs
index bc0b768e..821aa42a 100644
--- a/lib/ur/top.urs
+++ b/lib/ur/top.urs
@@ -87,6 +87,13 @@ val foldRX2 : K --> tf1 :: (K -> Type) -> tf2 :: (K -> Type) -> ctx :: {Unit}
-> r :: {K} -> folder r
-> $(map tf1 r) -> $(map tf2 r) -> xml ctx [] []
+val queryI : tables ::: {{Type}} -> exps ::: {Type}
+ -> [tables ~ exps] =>
+ sql_query tables exps
+ -> ($(exps ++ map (fn fields :: {Type} => $fields) tables)
+ -> transaction unit)
+ -> transaction unit
+
val queryX : tables ::: {{Type}} -> exps ::: {Type} -> ctx ::: {Unit}
-> [tables ~ exps] =>
sql_query tables exps
@@ -127,3 +134,14 @@ val eqNullable' : tables ::: {{Type}} -> agg ::: {{Type}} -> exps ::: {Type}
-> sql_exp tables agg exps (option t)
-> option t
-> sql_exp tables agg exps bool
+
+
+functor Broadcast(M : sig type t end) : sig
+ type topic
+
+ val inj : sql_injectable topic
+
+ val create : transaction topic
+ val subscribe : topic -> transaction (channel M.t)
+ val send : topic -> M.t -> transaction unit
+end
diff --git a/src/c/driver.c b/src/c/driver.c
index 4fa5db2f..1b616636 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -171,10 +171,10 @@ static void *worker(void *data) {
char *pass = uw_Basis_requestHeader(ctx, "UrWeb-Pass");
if (id && pass) {
- size_t idn = atoi(id);
+ unsigned idn = atoi(id);
uw_client_connect(idn, atoi(pass), sock);
dont_close = 1;
- fprintf(stderr, "Processed request for messages by client %d\n\n", (int)idn);
+ fprintf(stderr, "Processed request for messages by client %u\n\n", idn);
}
break;
}
@@ -217,6 +217,8 @@ static void *worker(void *data) {
else {
printf("Fatal error (out of retries): %s\n", uw_error_message(ctx));
+ try_rollback(ctx);
+
uw_reset_keep_error_message(ctx);
uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
uw_write_header(ctx, "Content-type: text/plain\r\n");
@@ -224,8 +226,6 @@ static void *worker(void *data) {
uw_write(ctx, uw_error_message(ctx));
uw_write(ctx, "\n");
- try_rollback(ctx);
-
break;
}
} else if (fk == UNLIMITED_RETRY)
@@ -233,6 +233,8 @@ static void *worker(void *data) {
else if (fk == FATAL) {
printf("Fatal error: %s\n", uw_error_message(ctx));
+ try_rollback(ctx);
+
uw_reset_keep_error_message(ctx);
uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\r\n");
uw_write_header(ctx, "Content-type: text/html\r\n");
@@ -241,26 +243,24 @@ static void *worker(void *data) {
uw_write(ctx, uw_error_message(ctx));
uw_write(ctx, "\n</body></html>");
- try_rollback(ctx);
-
break;
} else {
printf("Unknown uw_handle return code!\n");
+ try_rollback(ctx);
+
uw_reset_keep_request(ctx);
uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
uw_write_header(ctx, "Content-type: text/plain\r\n");
uw_write(ctx, "Unknown uw_handle return code!\n");
- try_rollback(ctx);
-
break;
}
- uw_reset_keep_request(ctx);
-
if (try_rollback(ctx))
break;
+
+ uw_reset_keep_request(ctx);
}
uw_send(ctx, sock);
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 7c0cdb25..0194c226 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -7,6 +7,7 @@
#include <ctype.h>
#include <setjmp.h>
#include <stdarg.h>
+#include <assert.h>
#include <pthread.h>
@@ -88,79 +89,56 @@ static void buf_append(buf *b, const char *s, size_t len) {
typedef enum { UNUSED, USED } usage;
-typedef struct channel_list {
- struct channel *data;
- struct channel_list *next;
-} channel_list;
-
typedef struct client {
- size_t id;
+ unsigned id;
usage mode;
- union {
- struct client *next;
- struct {
- pthread_mutex_t lock;
- int pass;
- buf msgs;
- int sock;
- time_t last_contact;
- unsigned refcount;
- channel_list *channels;
- } used;
- } data;
+ int pass;
+ struct client *next;
+ pthread_mutex_t lock;
+ buf msgs;
+ int sock;
+ time_t last_contact;
+ unsigned n_channels;
} client;
-typedef struct client_list {
- client *data;
- struct client_list *next;
-} client_list;
-
-typedef struct channel {
- size_t id;
- usage mode;
- union {
- struct channel *next;
- struct {
- pthread_mutex_t lock;
- client_list *clients;
- unsigned refcount;
- } used;
- } data;
-} channel;
-
// Persistent client state
-static client **clients, *clients_free;
-static size_t n_clients;
+static client **clients, *clients_free, *clients_used;
+static unsigned n_clients;
static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
-static client *uw_new_client() {
+static client *new_client() {
client *c;
pthread_mutex_lock(&clients_mutex);
if (clients_free) {
c = clients_free;
- clients_free = clients_free->data.next;
+ clients_free = clients_free->next;
}
else {
++n_clients;
clients = realloc(clients, sizeof(client) * n_clients);
c = malloc(sizeof(client));
c->id = n_clients-1;
+ pthread_mutex_init(&c->lock, NULL);
+ buf_init(&c->msgs, 0);
clients[n_clients-1] = c;
}
+ pthread_mutex_lock(&c->lock);
c->mode = USED;
- pthread_mutex_init(&c->data.used.lock, NULL);
- c->data.used.pass = rand();
- c->data.used.sock = -1;
- c->data.used.last_contact = time(NULL);
- buf_init(&c->data.used.msgs, 0);
- c->data.used.refcount = 0;
- c->data.used.channels = NULL;
+ c->pass = rand();
+ c->sock = -1;
+ c->last_contact = time(NULL);
+ buf_reset(&c->msgs);
+ c->n_channels = 0;
+ pthread_mutex_unlock(&c->lock);
+
+ c->next = clients_used;
+ clients_used = c;
pthread_mutex_unlock(&clients_mutex);
@@ -169,7 +147,7 @@ static client *uw_new_client() {
static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
-static client *uw_find_client(size_t id) {
+static client *find_client(unsigned id) {
client *c;
pthread_mutex_lock(&clients_mutex);
@@ -181,282 +159,111 @@ static client *uw_find_client(size_t id) {
c = clients[id];
- if (c->mode != USED) {
- pthread_mutex_unlock(&clients_mutex);
- return NULL;
- }
-
- pthread_mutex_lock(&c->data.used.lock);
- ++c->data.used.refcount;
- pthread_mutex_unlock(&c->data.used.lock);
pthread_mutex_unlock(&clients_mutex);
return c;
}
-static void uw_release_client(client *c) {
- pthread_mutex_lock(&c->data.used.lock);
- --c->data.used.refcount;
- pthread_mutex_unlock(&c->data.used.lock);
-}
-
-void uw_client_connect(size_t id, int pass, int sock) {
- client *c = uw_find_client(id);
+void uw_client_connect(unsigned id, int pass, int sock) {
+ client *c = find_client(id);
if (c == NULL) {
close(sock);
- fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+ fprintf(stderr, "Out-of-bounds client request (%u)\n", id);
return;
}
- uw_release_client(c);
+ pthread_mutex_lock(&c->lock);
- pthread_mutex_lock(&c->data.used.lock);
+ if (c->mode != USED) {
+ pthread_mutex_unlock(&c->lock);
+ close(sock);
+ fprintf(stderr, "Client request for unused slot (%u)\n", id);
+ return;
+ }
- if (pass != c->data.used.pass) {
- pthread_mutex_unlock(&c->data.used.lock);
+ if (pass != c->pass) {
+ pthread_mutex_unlock(&c->lock);
close(sock);
- fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+ fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass);
return;
}
- if (c->data.used.sock != -1) {
- close(c->data.used.sock);
- c->data.used.sock = -1;
+ if (c->sock != -1) {
+ close(c->sock);
+ c->sock = -1;
}
- c->data.used.last_contact = time(NULL);
+ c->last_contact = time(NULL);
- if (buf_used(&c->data.used.msgs) > 0) {
+ if (buf_used(&c->msgs) > 0) {
uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
- buf_reset(&c->data.used.msgs);
+ uw_really_send(sock, c->msgs.start, buf_used(&c->msgs));
+ buf_reset(&c->msgs);
close(sock);
}
else
- c->data.used.sock = sock;
+ c->sock = sock;
- pthread_mutex_unlock(&c->data.used.lock);
+ pthread_mutex_unlock(&c->lock);
}
-
-static void uw_free_client(client *c) {
- channel_list *chs;
-
+static void free_client(client *c) {
printf("Freeing client %d\n", c->id);
- if (c->mode == USED) {
- pthread_mutex_lock(&c->data.used.lock);
-
- for (chs = c->data.used.channels; chs; ) {
- client_list *prev, *cs;
-
- channel *ch = chs->data;
- channel_list *tmp = chs->next;
- free(chs);
- chs = tmp;
-
- pthread_mutex_lock(&ch->data.used.lock);
- for (prev = NULL, cs = ch->data.used.clients; cs; ) {
- if (cs->data == c) {
- client_list *tmp = cs->next;
- free(cs);
- cs = tmp;
- if (prev)
- prev->next = cs;
- else
- ch->data.used.clients = cs;
- }
- else {
- prev = cs;
- cs = cs->next;
- }
- }
- pthread_mutex_unlock(&ch->data.used.lock);
- }
+ c->mode = UNUSED;
+ c->pass = -1;
- if (c->data.used.sock != -1)
- close(c->data.used.sock);
-
- pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_destroy(&c->data.used.lock);
- buf_free(&c->data.used.msgs);
- c->mode = UNUSED;
-
- c->data.next = clients_free;
- clients_free = c;
- }
+ c->next = clients_free;
+ clients_free = c;
}
extern int uw_timeout;
void uw_prune_clients() {
- size_t i;
+ client *c, *next, *prev = NULL;
time_t cutoff;
cutoff = time(NULL) - uw_timeout;
pthread_mutex_lock(&clients_mutex);
- for (i = 0; i < n_clients; ++i) {
- if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
- && clients[i]->data.used.refcount == 0)
- uw_free_client(clients[i]);
- }
-
- pthread_mutex_unlock(&clients_mutex);
-}
-
-
-// Persistent channel state
-
-
-static channel **channels, *channels_free;
-static size_t n_channels;
-
-static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static channel *uw_new_channel() {
- channel *ch;
-
- pthread_mutex_lock(&channels_mutex);
-
- if (channels_free) {
- ch = channels_free;
- channels_free = channels_free->data.next;
- }
- else {
- ++n_channels;
- channels = realloc(channels, sizeof(channels) * n_channels);
- ch = malloc(sizeof(channel));
- ch->id = n_channels-1;
- channels[n_channels-1] = ch;
- }
-
- ch->mode = USED;
- pthread_mutex_init(&ch->data.used.lock, NULL);
- ch->data.used.clients = NULL;
- ch->data.used.refcount = 0;
-
- pthread_mutex_unlock(&channels_mutex);
-
- return ch;
-}
-
-static void uw_free_channel(channel *ch) {
- if (ch->mode == USED) {
- client_list *cs;
-
- for (cs = ch->data.used.clients; cs; ) {
- client_list *tmp = cs->next;
- free(cs);
- cs = tmp;
- }
- pthread_mutex_destroy(&ch->data.used.lock);
- ch->mode = UNUSED;
- ch->data.next = channels_free;
- channels_free = ch;
- }
-}
-
-static channel *uw_find_channel(size_t id) {
- channel *ch = NULL;
-
- pthread_mutex_lock(&channels_mutex);
-
- if (id < n_channels && channels[id]->mode == USED) {
- ch = channels[id];
-
- pthread_mutex_lock(&ch->data.used.lock);
- ++ch->data.used.refcount;
- pthread_mutex_unlock(&ch->data.used.lock);
- }
-
- pthread_mutex_unlock(&channels_mutex);
-
- return ch;
-}
-
-static void uw_release_channel(channel *ch) {
- pthread_mutex_lock(&ch->data.used.lock);
- ++ch->data.used.refcount;
- pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_subscribe(channel *ch, client *c) {
- client_list *cs;
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (cs = ch->data.used.clients; cs; cs = cs->next)
- if (cs->data == c) {
- pthread_mutex_unlock(&ch->data.used.lock);
- return;
- }
-
- cs = malloc(sizeof(client_list));
- cs->data = c;
- cs->next = ch->data.used.clients;
- ch->data.used.clients = cs;
-
- pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_unsubscribe(channel *ch, client *c) {
- client_list *prev, *cur, *tmp;
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (prev = NULL, cur = ch->data.used.clients; cur; ) {
- if (cur->data == c) {
+ for (c = clients_used; c; c = next) {
+ next = c->next;
+ pthread_mutex_lock(&c->lock);
+ if (c->last_contact < cutoff) {
if (prev)
- prev->next = cur->next;
+ prev->next = next;
else
- ch->data.used.clients = cur->next;
- tmp = cur;
- cur = cur->next;
- free(tmp);
- }
- else {
- prev = cur;
- cur = cur->next;
+ clients_used = next;
+ free_client(c);
}
+ else
+ prev = c;
+ pthread_mutex_unlock(&c->lock);
}
- pthread_mutex_unlock(&ch->data.used.lock);
+ pthread_mutex_unlock(&clients_mutex);
}
-static void uw_channel_send(channel *ch, const char *msg) {
- size_t len = strlen(msg), preLen;
- char pre[INTS_MAX + 2];
- client_list *cs;
-
- sprintf(pre, "%d\n", (int)ch->id);
- preLen = strlen(pre);
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (cs = ch->data.used.clients; cs; cs = cs->next) {
- client *c = cs->data;
-
- pthread_mutex_lock(&c->data.used.lock);
+static uw_Basis_channel new_channel(client *c) {
+ uw_Basis_channel ch = {c->id, c->n_channels++};
+ return ch;
+}
- if (c->data.used.sock != -1) {
- uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(c->data.used.sock, pre, preLen);
- uw_really_send(c->data.used.sock, msg, len);
- uw_really_send(c->data.used.sock, "\n", 1);
- close(c->data.used.sock);
- c->data.used.sock = -1;
- } else {
- buf_append(&c->data.used.msgs, pre, preLen);
- buf_append(&c->data.used.msgs, msg, len);
- buf_append(&c->data.used.msgs, "\n", 1);
- }
+static void client_send(int already_locked, client *c, buf *msg) {
+ if (!already_locked)
+ pthread_mutex_lock(&c->lock);
- pthread_mutex_unlock(&c->data.used.lock);
- }
+ if (c->sock != -1) {
+ uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(c->sock, msg->start, buf_used(msg));
+ close(c->sock);
+ c->sock = -1;
+ } else
+ buf_append(&c->msgs, msg->start, buf_used(msg));
- pthread_mutex_unlock(&ch->data.used.lock);
+ if (!already_locked)
+ pthread_mutex_unlock(&c->lock);
}
@@ -466,7 +273,6 @@ void uw_global_init() {
srand(time(NULL) ^ getpid());
clients = malloc(0);
- channels = malloc(0);
}
@@ -484,15 +290,9 @@ typedef struct {
} cleanup;
typedef struct {
- usage mode;
- channel *ch;
- enum { OLD, NEW } newness;
-
- size_t n_subscribed;
- client **subscribed;
-
+ unsigned client;
buf msgs;
-} channel_delta;
+} delta;
struct uw_context {
char *headers, *headers_end;
@@ -512,11 +312,13 @@ struct uw_context {
const char *script_header, *url_prefix;
- size_t n_deltas;
- channel_delta *deltas;
+ size_t n_deltas, used_deltas;
+ delta *deltas;
int timeout;
+ client *client;
+
char error_message[ERROR_BUF_LEN];
};
@@ -548,7 +350,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si
ctx->source_count = 0;
- ctx->n_deltas = 0;
+ ctx->n_deltas = ctx->used_deltas = 0;
ctx->deltas = malloc(0);
ctx->timeout = uw_timeout;
@@ -574,10 +376,8 @@ void uw_free(uw_context ctx) {
free(ctx->inputs);
free(ctx->cleanup);
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- free(ctx->deltas[i].subscribed);
+ for (i = 0; i < ctx->n_deltas; ++i)
buf_free(&ctx->deltas[i].msgs);
- }
free(ctx);
}
@@ -591,8 +391,8 @@ void uw_reset_keep_error_message(uw_context ctx) {
ctx->regions = NULL;
ctx->cleanup_front = ctx->cleanup;
ctx->source_count = 0;
- if (ctx->n_deltas > 0)
- ctx->deltas[0].mode = UNUSED;
+ ctx->used_deltas = 0;
+ ctx->client = NULL;
}
void uw_reset_keep_request(uw_context ctx) {
@@ -669,18 +469,73 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) {
++ctx->cleanup_front;
}
+uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
+ int len = strlen(h);
+ char *s = ctx->headers, *p;
+
+ while (p = strchr(s, ':')) {
+ if (p - s == len && !strncasecmp(s, h, len)) {
+ return p + 2;
+ } else {
+ if ((s = strchr(p, 0)) && s < ctx->headers_end)
+ s += 2;
+ else
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
+
+void uw_login(uw_context ctx) {
+ if (ctx->script_header[0]) {
+ char *id_s, *pass_s;
+
+ if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client"))
+ && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) {
+ unsigned id = atoi(id_s);
+ int pass = atoi(pass_s);
+ client *c = find_client(id);
+
+ if (c == NULL)
+ uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s);
+ else {
+ pthread_mutex_lock(&c->lock);
+ ctx->client = c;
+
+ if (c->mode != USED)
+ uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id);
+ if (c->pass != pass)
+ uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass);
+ }
+ } else {
+ client *c = new_client();
+ pthread_mutex_lock(&c->lock);
+ ctx->client = c;
+ }
+ }
+}
+
failure_kind uw_begin(uw_context ctx, char *path) {
int r = setjmp(ctx->jmp_buf);
if (r == 0) {
if (uw_db_begin(ctx))
uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+
uw_handle(ctx, path);
}
return r;
}
+uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) {
+ if (ctx->client == NULL)
+ uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code");
+
+ return ctx->client->id;
+}
+
void uw_pop_cleanup(uw_context ctx) {
if (ctx->cleanup_front == ctx->cleanup)
uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -843,9 +698,6 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
if (ctx->script_header[0] == 0)
return "";
else {
- int pass;
- client *c = uw_new_client(&pass);
-
char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script));
sprintf(r, "%s<script>%s</script>",
ctx->script_header,
@@ -855,16 +707,13 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
}
const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) {
- if (ctx->script_header[0] == 0)
+ if (ctx->client == NULL)
return "";
else {
- int pass;
- client *c = uw_new_client(&pass);
-
char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload));
- sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
- (int)c->id,
- c->data.used.pass,
+ sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
+ ctx->client->id,
+ ctx->client->pass,
ctx->url_prefix,
ctx->timeout,
onload);
@@ -942,6 +791,21 @@ uw_Basis_string uw_Basis_jsifyString_ws(uw_context ctx, uw_Basis_string s) {
return r;
}
+char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client == NULL || chn.cli != ctx->client->id)
+ return "null";
+ else {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", chn.chn, &len);
+ ctx->heap.front += len+1;
+ return r;
+ }
+}
+
uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) {
int len;
size_t s_len = strlen(s);
@@ -1007,16 +871,6 @@ char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) {
return result;
}
-char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) {
- char *result;
- int len;
- uw_check_heap(ctx, INTS_MAX);
- result = ctx->heap.front;
- sprintf(result, "%lld%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return result;
-}
-
char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) {
char *result;
int len;
@@ -1116,15 +970,19 @@ char *uw_Basis_urlifyInt(uw_context ctx, uw_Basis_int n) {
return r;
}
-char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) {
- int len;
- char *r;
+char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client == NULL || chn.cli != ctx->client->id)
+ return "";
+ else {
+ int len;
+ char *r;
- uw_check_heap(ctx, INTS_MAX);
- r = ctx->heap.front;
- sprintf(r, "%lld%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return r;
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", chn.chn, &len);
+ ctx->heap.front += len+1;
+ return r;
+ }
}
char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) {
@@ -1182,13 +1040,15 @@ uw_unit uw_Basis_urlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
-uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) {
- int len;
-
- uw_check(ctx, INTS_MAX);
- sprintf(ctx->page.front, "%lld%n", (long long)n, &len);
- ctx->page.front += len;
-
+uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client != NULL && chn.cli == ctx->client->id) {
+ int len;
+
+ uw_check(ctx, INTS_MAX + 1);
+ sprintf(ctx->page.front, "%u%n", chn.chn, &len);
+ ctx->page.front += len;
+ }
+
return uw_unit_v;
}
@@ -1255,10 +1115,6 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) {
return r;
}
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
- return uw_Basis_unurlifyInt(ctx, s);
-}
-
uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
char *new_s = uw_unurlify_advance(*s);
uw_Basis_float r;
@@ -1350,10 +1206,6 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
-char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
- return uw_Basis_htmlifyInt(ctx, ch);
-}
-
char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
int len;
char *r;
@@ -1541,17 +1393,6 @@ char *uw_Basis_sqlifyInt(uw_context ctx, uw_Basis_int n) {
return r;
}
-char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) {
- int len;
- char *r;
-
- uw_check_heap(ctx, INTS_MAX + 6);
- r = ctx->heap.front;
- sprintf(r, "%lld::int4%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return r;
-}
-
char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) {
if (n == NULL)
return "NULL";
@@ -1614,6 +1455,52 @@ uw_Basis_string uw_Basis_sqlifyString(uw_context ctx, uw_Basis_string s) {
return r;
}
+char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ int len;
+ char *r;
+ unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+ uw_check_heap(ctx, INTS_MAX + 7);
+ r = ctx->heap.front;
+ sprintf(r, "%lld::int8%n", combo, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ int len;
+ char *r;
+ unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%lld%n", combo, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 7);
+ r = ctx->heap.front;
+ sprintf(r, "%u::int4%n", cli, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", cli, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) {
if (s == NULL)
return "NULL";
@@ -1637,6 +1524,21 @@ char *uw_Basis_sqlifyBoolN(uw_context ctx, uw_Basis_bool *b) {
char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) {
size_t len;
+ char *r, *s;
+ struct tm stm;
+
+ if (localtime_r(&t, &stm)) {
+ s = uw_malloc(ctx, TIMES_MAX);
+ len = strftime(s, TIMES_MAX, TIME_FMT, &stm);
+ r = uw_malloc(ctx, len + 14);
+ sprintf(r, "'%s'::timestamp", s);
+ return r;
+ } else
+ return "<Invalid time>";
+}
+
+char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) {
+ size_t len;
char *r;
struct tm stm;
@@ -1723,18 +1625,6 @@ uw_Basis_int *uw_Basis_stringToInt(uw_context ctx, uw_Basis_string s) {
return NULL;
}
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) {
- char *endptr;
- uw_Basis_channel n = strtoll(s, &endptr, 10);
-
- if (*s != '\0' && *endptr == '\0') {
- uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel));
- *r = n;
- return r;
- } else
- return NULL;
-}
-
uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) {
char *endptr;
uw_Basis_float n = strtod(s, &endptr);
@@ -1802,14 +1692,27 @@ uw_Basis_int uw_Basis_stringToInt_error(uw_context ctx, uw_Basis_string s) {
uw_error(ctx, FATAL, "Can't parse int: %s", s);
}
+#include <errno.h>
+
uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) {
+ unsigned long long n;
+
+ if (sscanf(s, "%llu", &n) < 1)
+ uw_error(ctx, FATAL, "Can't parse channel: %s", s);
+ else {
+ uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)};
+ return ch;
+ }
+}
+
+uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) {
char *endptr;
- uw_Basis_channel n = strtoll(s, &endptr, 10);
+ unsigned long n = strtoul(s, &endptr, 10);
if (*s != '\0' && *endptr == '\0')
return n;
else
- uw_error(ctx, FATAL, "Can't parse channel int: %s", s);
+ uw_error(ctx, FATAL, "Can't parse client: %s", s);
}
uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) {
@@ -1856,22 +1759,6 @@ uw_Basis_time uw_Basis_stringToTime_error(uw_context ctx, uw_Basis_string s) {
}
}
-uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
- int len = strlen(h);
- char *s = ctx->headers, *p;
-
- while (p = strchr(s, ':')) {
- if (p - s == len && !strncasecmp(s, h, len)) {
- return p + 2;
- } else {
- if ((s = strchr(p, 0)) && s < ctx->headers_end)
- s += 2;
- else
- return NULL;
- }
- }
-}
-
uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) {
int len = strlen(c);
char *s = ctx->headers, *p = ctx->outHeaders.start;
@@ -1930,100 +1817,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
-static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
- size_t i;
- channel_delta *cd;
+static delta *allocate_delta(uw_context ctx, unsigned client) {
+ unsigned i;
+ delta *d;
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+ for (i = 0; i < ctx->used_deltas; ++i)
+ if (ctx->deltas[i].client == client)
+ return &ctx->deltas[i];
- if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
- return &ctx->deltas[i];
-
- if (i < ctx->n_deltas)
- cd = &ctx->deltas[i];
- else {
- ++ctx->n_deltas;
- ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
- cd = &ctx->deltas[ctx->n_deltas-1];
- cd->n_subscribed = 0;
- cd->subscribed = malloc(0);
- buf_init(&cd->msgs, 0);
+ if (ctx->used_deltas >= ctx->n_deltas) {
+ ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas);
+ buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0);
}
-
- cd->mode = USED;
- cd->newness = OLD;
- cd->ch = ch;
- if (cd->n_subscribed > 0)
- cd->subscribed[0] = NULL;
- buf_reset(&cd->msgs);
- return cd;
+
+ d = &ctx->deltas[ctx->used_deltas++];
+ d->client = client;
+ buf_reset(&d->msgs);
+ return d;
}
uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
- size_t i;
- channel *ch = uw_new_channel();
- ++ch->data.used.refcount;
- channel_delta *cd = allocate_delta(ctx, ch);
+ if (ctx->client == NULL)
+ uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection");
- cd->newness = NEW;
-
- return ch->id;
-}
-
-static int delta_used(channel_delta *cd) {
- return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
+ return new_channel(ctx->client);
}
-uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
- channel *ch = uw_find_channel(chn);
-
- if (ch == NULL)
- uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
- else {
- size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
- int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
- client *c = uw_find_client(id);
-
- if (c == NULL) {
- uw_release_channel(ch);
- uw_error(ctx, FATAL, "Unknown client ID in subscription request");
- } else if (c->data.used.pass != pass) {
- uw_release_channel(ch);
- uw_release_client(c);
- uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass);
- } else {
- size_t i;
- channel_delta *cd = allocate_delta(ctx, ch);
+uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
+ delta *d = allocate_delta(ctx, chn.cli);
+ size_t len;
+ int preLen;
+ char pre[INTS_MAX + 2];
- if (delta_used(cd))
- uw_release_channel(ch);
+ len = strlen(msg);
- for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
+ sprintf(pre, "%u\n%n", chn.chn, &preLen);
- if (i < cd->n_subscribed)
- cd->subscribed[i] = c;
- else {
- ++cd->n_subscribed;
- cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
- cd->subscribed[cd->n_subscribed-1] = c;
- }
- }
- }
-
- return uw_unit_v;
-}
-
-uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
- channel *ch = uw_find_channel(chn);
-
- if (ch == NULL)
- uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
- else {
- channel_delta *cd = allocate_delta(ctx, ch);
- if (delta_used(cd))
- uw_release_channel(ch);
- buf_append(&cd->msgs, msg, strlen(msg));
- }
+ buf_append(&d->msgs, pre, preLen);
+ buf_append(&d->msgs, msg, len);
+ buf_append(&d->msgs, "\n", 1);
return uw_unit_v;
}
@@ -2032,46 +1864,27 @@ int uw_db_commit(uw_context);
int uw_db_rollback(uw_context);
void uw_commit(uw_context ctx) {
- size_t i, j;
+ unsigned i;
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- channel *ch = ctx->deltas[i].ch;
-
- for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
- client *c = ctx->deltas[i].subscribed[j];
+ if (uw_db_commit(ctx))
+ uw_error(ctx, FATAL, "Error running SQL COMMIT");
- uw_subscribe(ch, c);
- uw_release_client(c);
- }
+ for (i = 0; i < ctx->used_deltas; ++i) {
+ delta *d = &ctx->deltas[i];
+ client *c = find_client(d->client);
- if (buf_used(&ctx->deltas[i].msgs) > 0) {
- uw_channel_send(ch, ctx->deltas[i].msgs.start);
- }
+ assert (c != NULL && c->mode == USED);
- uw_release_channel(ch);
+ client_send(c == ctx->client, c, &d->msgs);
}
- if (uw_db_commit(ctx))
- uw_error(ctx, FATAL, "Error running SQL COMMIT");
+ if (ctx->client)
+ pthread_mutex_unlock(&ctx->client->lock);
}
int uw_rollback(uw_context ctx) {
- size_t i, j;
-
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- channel *ch = ctx->deltas[i].ch;
-
- for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
- client *c = ctx->deltas[i].subscribed[j];
-
- uw_release_client(c);
- }
-
- if (ctx->deltas[i].newness == NEW)
- uw_free_channel(ch);
- else
- uw_release_channel(ch);
- }
+ if (ctx->client)
+ pthread_mutex_unlock(&ctx->client->lock);
return uw_db_rollback(ctx);
}
diff --git a/src/cjr_print.sml b/src/cjr_print.sml
index a17a0416..351180b7 100644
--- a/src/cjr_print.sml
+++ b/src/cjr_print.sml
@@ -404,6 +404,7 @@ fun p_unsql wontLeakStrings env (tAll as (t, loc)) e =
| TFfi ("Basis", "bool") => box [string "uw_Basis_stringToBool_error(ctx, ", e, string ")"]
| TFfi ("Basis", "time") => box [string "uw_Basis_stringToTime_error(ctx, ", e, string ")"]
| TFfi ("Basis", "channel") => box [string "uw_Basis_stringToChannel_error(ctx, ", e, string ")"]
+ | TFfi ("Basis", "client") => box [string "uw_Basis_stringToClient_error(ctx, ", e, string ")"]
| _ => (ErrorMsg.errorAt loc "Don't know how to unmarshal type from SQL";
Print.eprefaces' [("Type", p_typ env tAll)];
@@ -447,6 +448,7 @@ datatype sql_type =
| Bool
| Time
| Channel
+ | Client
| Nullable of sql_type
fun p_sql_type' t =
@@ -457,6 +459,7 @@ fun p_sql_type' t =
| Bool => "uw_Basis_bool"
| Time => "uw_Basis_time"
| Channel => "uw_Basis_channel"
+ | Client => "uw_Basis_client"
| Nullable String => "uw_Basis_string"
| Nullable t => p_sql_type' t ^ "*"
@@ -473,6 +476,7 @@ fun getPargs (e, _) =
| EFfiApp ("Basis", "sqlifyBool", [e]) => [(e, Bool)]
| EFfiApp ("Basis", "sqlifyTime", [e]) => [(e, Time)]
| EFfiApp ("Basis", "sqlifyChannel", [e]) => [(e, Channel)]
+ | EFfiApp ("Basis", "sqlifyClient", [e]) => [(e, Client)]
| ECase (e,
[((PNone _, _),
@@ -496,8 +500,9 @@ fun p_ensql t e =
| Float => box [string "uw_Basis_attrifyFloat(ctx, ", e, string ")"]
| String => e
| Bool => box [string "(", e, string " ? \"TRUE\" : \"FALSE\")"]
- | Time => box [string "uw_Basis_sqlifyTime(ctx, ", e, string ")"]
+ | Time => box [string "uw_Basis_attrifyTime(ctx, ", e, string ")"]
| Channel => box [string "uw_Basis_attrifyChannel(ctx, ", e, string ")"]
+ | Client => box [string "uw_Basis_attrifyClient(ctx, ", e, string ")"]
| Nullable String => e
| Nullable t => box [string "(",
e,
@@ -1982,7 +1987,7 @@ fun p_decl env (dAll as (d, _) : decl) =
newline,
string "PGconn *conn = uw_get_db(ctx);",
newline,
- string "PGresult *res = PQexec(conn, \"BEGIN\");",
+ string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");",
newline,
newline,
string "if (res == NULL) return 1;",
@@ -2108,7 +2113,8 @@ fun p_sqltype'' env (tAll as (t, loc)) =
| TFfi ("Basis", "string") => "text"
| TFfi ("Basis", "bool") => "bool"
| TFfi ("Basis", "time") => "timestamp"
- | TFfi ("Basis", "channel") => "int4"
+ | TFfi ("Basis", "channel") => "int8"
+ | TFfi ("Basis", "client") => "int4"
| _ => (ErrorMsg.errorAt loc "Don't know SQL equivalent of type";
Print.eprefaces' [("Type", p_typ env tAll)];
"ERROR")
@@ -2368,6 +2374,8 @@ fun p_file env (ds, ps) =
string (!Monoize.urlPrefix),
string "\");",
newline]),
+ string "uw_login(ctx);",
+ newline,
box [string "{",
newline,
box (ListUtil.mapi (fn (i, t) => box [p_typ env t,
diff --git a/src/jscomp.sml b/src/jscomp.sml
index f5627e24..a487eeab 100644
--- a/src/jscomp.sml
+++ b/src/jscomp.sml
@@ -49,7 +49,6 @@ val funcs = [(("Basis", "alert"), "alert"),
(("Basis", "urlifyInt"), "ts"),
(("Basis", "urlifyFloat"), "ts"),
(("Basis", "urlifyString"), "uf"),
- (("Basis", "urlifyChannel"), "ts"),
(("Basis", "recv"), "rv")]
structure FM = BinaryMapFn(struct
@@ -220,7 +219,7 @@ fun process file =
| TFfi ("Basis", "string") => ((EFfiApp ("Basis", "jsifyString", [e]), loc), st)
| TFfi ("Basis", "int") => ((EFfiApp ("Basis", "htmlifyInt", [e]), loc), st)
| TFfi ("Basis", "float") => ((EFfiApp ("Basis", "htmlifyFloat", [e]), loc), st)
- | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "htmlifyChannel", [e]), loc), st)
+ | TFfi ("Basis", "channel") => ((EFfiApp ("Basis", "jsifyChannel", [e]), loc), st)
| TFfi ("Basis", "bool") => ((ECase (e,
[((PCon (Enum, PConFfi {mod = "Basis",
@@ -348,7 +347,7 @@ fun process file =
| TFfi ("Basis", "string") => ("uu(t[i++])", st)
| TFfi ("Basis", "int") => ("parseInt(t[i++])", st)
| TFfi ("Basis", "float") => ("parseFloat(t[i++])", st)
- | TFfi ("Basis", "channel") => ("parseInt(t[i++])", st)
+ | TFfi ("Basis", "channel") => ("(t[i++].length > 0 ? parseInt(t[i]) : null)", st)
| TFfi ("Basis", "bool") => ("t[i++] == \"True\"", st)
diff --git a/src/monoize.sml b/src/monoize.sml
index 03ce6311..5701cc0c 100644
--- a/src/monoize.sml
+++ b/src/monoize.sml
@@ -1110,14 +1110,6 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
((L'.EAbs ("_", (L'.TRecord [], loc), (L'.TFfi ("Basis", "channel"), loc),
(L'.EFfiApp ("Basis", "new_channel", [(L'.ERecord [], loc)]), loc)), loc),
fm)
- | L.ECApp ((L.EFfi ("Basis", "subscribe"), _), t) =>
- ((L'.EAbs ("ch", (L'.TFfi ("Basis", "channel"), loc),
- (L'.TFun ((L'.TRecord [], loc), (L'.TRecord [], loc)), loc),
- (L'.EAbs ("_", (L'.TRecord [], loc), (L'.TRecord [], loc),
- (L'.EFfiApp ("Basis", "subscribe",
- [(L'.ERel 1, loc)]),
- loc)), loc)), loc),
- fm)
| L.ECApp ((L.EFfi ("Basis", "send"), _), t) =>
let
val t = monoType env t
@@ -1431,6 +1423,10 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
((L'.EAbs ("x", (L'.TFfi ("Basis", "channel"), loc), (L'.TFfi ("Basis", "string"), loc),
(L'.EFfiApp ("Basis", "sqlifyChannel", [(L'.ERel 0, loc)]), loc)), loc),
fm)
+ | L.EFfi ("Basis", "sql_client") =>
+ ((L'.EAbs ("x", (L'.TFfi ("Basis", "client"), loc), (L'.TFfi ("Basis", "string"), loc),
+ (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc)), loc),
+ fm)
| L.ECApp ((L.EFfi ("Basis", "sql_prim"), _), t) =>
let
val t = monoType env t
diff --git a/src/prepare.sml b/src/prepare.sml
index 1f3f323a..258b9dcf 100644
--- a/src/prepare.sml
+++ b/src/prepare.sml
@@ -48,6 +48,8 @@ fun prepString (e, ss, n) =
| EFfiApp ("Basis", "sqlifyTime", [e]) =>
SOME ("$" ^ Int.toString (n + 1) ^ "::timestamp" :: ss, n + 1)
| EFfiApp ("Basis", "sqlifyChannel", [e]) =>
+ SOME ("$" ^ Int.toString (n + 1) ^ "::int8" :: ss, n + 1)
+ | EFfiApp ("Basis", "sqlifyClient", [e]) =>
SOME ("$" ^ Int.toString (n + 1) ^ "::int4" :: ss, n + 1)
| ECase (e,
diff --git a/tests/chat.ur b/tests/chat.ur
index b982836d..8763a190 100644
--- a/tests/chat.ur
+++ b/tests/chat.ur
@@ -9,32 +9,22 @@ and renderS logS =
log <- signal logS;
return (render log)
+structure Room = Broadcast(struct
+ type t = string
+ end)
+
sequence s
-table t : { Id : int, Title : string, Chan : option (channel string) }
+table t : { Id : int, Title : string, Room : Room.topic }
fun chat id =
- r <- oneRow (SELECT t.Title, t.Chan FROM t WHERE t.Id = {[id]});
- ch <- (case r.T.Chan of
- None => (ch <- channel;
- dml (UPDATE t SET Chan = {[Some ch]} WHERE Id = {[id]});
- return ch)
- | Some ch => return ch);
+ r <- oneRow (SELECT t.Title, t.Room FROM t WHERE t.Id = {[id]});
+ ch <- Room.subscribe r.T.Room;
newLine <- source "";
logHead <- source End;
logTail <- source logHead;
let
- fun getCh () =
- r <- oneRow (SELECT t.Chan FROM t WHERE t.Id = {[id]});
- case r.T.Chan of
- None => error <xml>Channel disappeared</xml>
- | Some ch => return ch
-
- fun join () =
- ch <- getCh ();
- subscribe ch
-
fun onload () =
let
fun listener () =
@@ -45,13 +35,16 @@ fun chat id =
set logTail newTail;
listener ()
in
- join ();
listener ()
end
+ fun getRoom () =
+ r <- oneRow (SELECT t.Room FROM t WHERE t.Id = {[id]});
+ return r.T.Room
+
fun speak line =
- ch <- getCh ();
- send ch line
+ room <- getRoom ();
+ Room.send room line
fun doSpeak () =
line <- get newLine;
@@ -84,7 +77,8 @@ and main () : transaction page =
let
fun create r =
id <- nextval s;
- dml (INSERT INTO t (Id, Title, Chan) VALUES ({[id]}, {[r.Title]}, NULL));
+ room <- Room.create;
+ dml (INSERT INTO t (Id, Title, Room) VALUES ({[id]}, {[r.Title]}, {[room]}));
main ()
in
ls <- list ();