diff options
author | Adam Chlipala <adamc@hcoop.net> | 2009-03-29 13:30:01 -0400 |
---|---|---|
committer | Adam Chlipala <adamc@hcoop.net> | 2009-03-29 13:30:01 -0400 |
commit | 5430dbfa3f1c7c0adaabc230e86ffd90e6f923da (patch) | |
tree | 3bd72d9c87173e9ba21a556e5b90841f36e24651 | |
parent | 6217967a353bc9d97ae45c2af495b653a47e2481 (diff) |
Expunging non-nullable rows
-rw-r--r-- | include/urweb.h | 2 | ||||
-rw-r--r-- | lib/ur/basis.urs | 7 | ||||
-rw-r--r-- | lib/ur/top.ur | 20 | ||||
-rw-r--r-- | src/c/driver.c | 5 | ||||
-rw-r--r-- | src/c/urweb.c | 82 | ||||
-rw-r--r-- | src/cjr.sml | 2 | ||||
-rw-r--r-- | src/cjr_print.sml | 221 | ||||
-rw-r--r-- | src/mono.sml | 2 | ||||
-rw-r--r-- | src/mono_print.sml | 10 | ||||
-rw-r--r-- | src/mono_shake.sml | 1 | ||||
-rw-r--r-- | src/monoize.sml | 131 |
11 files changed, 289 insertions, 194 deletions
diff --git a/include/urweb.h b/include/urweb.h index a12952a4..e3d3f71f 100644 --- a/include/urweb.h +++ b/include/urweb.h @@ -9,7 +9,7 @@ extern uw_unit uw_unit_v; void uw_global_init(void); void uw_client_connect(unsigned id, int pass, int sock); -void uw_prune_clients(time_t timeout); +void uw_prune_clients(uw_context); uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len); void uw_set_db(uw_context, void*); diff --git a/lib/ur/basis.urs b/lib/ur/basis.urs index d6b27852..586c5ef9 100644 --- a/lib/ur/basis.urs +++ b/lib/ur/basis.urs @@ -207,15 +207,12 @@ val sql_int : sql_injectable_prim int val sql_float : sql_injectable_prim float val sql_string : sql_injectable_prim string val sql_time : sql_injectable_prim time - -class sql_injectable_nullable -val sql_channel : t ::: Type -> sql_injectable_nullable (channel t) -val sql_client : sql_injectable_nullable client +val sql_channel : t ::: Type -> sql_injectable_prim (channel t) +val sql_client : sql_injectable_prim client class sql_injectable val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t val sql_option_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable (option t) -val sql_nullable : t ::: Type -> sql_injectable_nullable t -> sql_injectable (option t) val sql_inject : tables ::: {{Type}} -> agg ::: {{Type}} -> exps ::: {Type} -> t ::: Type diff --git a/lib/ur/top.ur b/lib/ur/top.ur index 5465da3d..154e88e9 100644 --- a/lib/ur/top.ur +++ b/lib/ur/top.ur @@ -200,7 +200,7 @@ fun eqNullable' (tables ::: {{Type}}) (agg ::: {{Type}}) (exps ::: {Type}) functor Broadcast(M : sig type t end) = struct sequence s - table t : {Id : int, Client : option client, Channel : option (channel M.t)} + table t : {Id : int, Client : client, Channel : channel M.t} type topic = int @@ -208,27 +208,17 @@ functor Broadcast(M : sig type t end) = struct val create = nextval s - val cleanup = - dml (DELETE FROM t WHERE Client IS NULL) - fun subscribe id = cli <- self; - cleanup; - ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]}); + ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[cli]}); case ro of None => ch <- channel; - dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]})); + dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[cli]}, {[ch]})); return ch - | Some r => - case r.T.Channel of - None => error <xml>Broadcast.subscribe: Got null result</xml> - | Some ch => return ch + | Some r => return r.T.Channel fun send id msg = - cleanup; queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]}) - (fn r => case r.T.Channel of - None => error <xml>Broadcast.send: Got null result</xml> - | Some ch => Basis.send ch msg) + (fn r => Basis.send r.T.Channel msg) end diff --git a/src/c/driver.c b/src/c/driver.c index 1b616636..14d08b57 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -278,8 +278,11 @@ static void *worker(void *data) { } static void *client_pruner(void *data) { + uw_context ctx = uw_init(0, 0, 0, 0); + uw_db_init(ctx); + while (1) { - uw_prune_clients(5); + uw_prune_clients(ctx); sleep(5); } } diff --git a/src/c/urweb.c b/src/c/urweb.c index 0194c226..8ad50711 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -208,7 +208,7 @@ void uw_client_connect(unsigned id, int pass, int sock) { } static void free_client(client *c) { - printf("Freeing client %d\n", c->id); + printf("Freeing client %u\n", c->id); c->mode = UNUSED; c->pass = -1; @@ -217,34 +217,6 @@ static void free_client(client *c) { clients_free = c; } -extern int uw_timeout; - -void uw_prune_clients() { - client *c, *next, *prev = NULL; - time_t cutoff; - - cutoff = time(NULL) - uw_timeout; - - pthread_mutex_lock(&clients_mutex); - - for (c = clients_used; c; c = next) { - next = c->next; - pthread_mutex_lock(&c->lock); - if (c->last_contact < cutoff) { - if (prev) - prev->next = next; - else - clients_used = next; - free_client(c); - } - else - prev = c; - pthread_mutex_unlock(&c->lock); - } - - pthread_mutex_unlock(&clients_mutex); -} - static uw_Basis_channel new_channel(client *c) { uw_Basis_channel ch = {c->id, c->n_channels++}; return ch; @@ -322,7 +294,7 @@ struct uw_context { char error_message[ERROR_BUF_LEN]; }; -extern int uw_inputs_len; +extern int uw_inputs_len, uw_timeout; uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) { uw_context ctx = malloc(sizeof(struct uw_context)); @@ -1888,3 +1860,53 @@ int uw_rollback(uw_context ctx) { return uw_db_rollback(ctx); } + + +// "Garbage collection" + +void uw_expunger(uw_context ctx, uw_Basis_client cli); + +static failure_kind uw_expunge(uw_context ctx, uw_Basis_client cli) { + int r = setjmp(ctx->jmp_buf); + + if (r == 0) + uw_expunger(ctx, cli); + + return r; +} + +void uw_prune_clients(uw_context ctx) { + client *c, *next, *prev = NULL; + time_t cutoff; + + cutoff = time(NULL) - uw_timeout; + + pthread_mutex_lock(&clients_mutex); + + for (c = clients_used; c; c = next) { + next = c->next; + pthread_mutex_lock(&c->lock); + if (c->last_contact < cutoff) { + failure_kind fk = UNLIMITED_RETRY; + if (prev) + prev->next = next; + else + clients_used = next; + while (fk == UNLIMITED_RETRY) { + uw_reset(ctx); + fk = uw_expunge(ctx, c->id); + if (fk == SUCCESS) { + free_client(c); + break; + } + } + if (fk != SUCCESS) + printf("Expunge blocked by error: %s\n", uw_error_message(ctx)); + } + else + prev = c; + pthread_mutex_unlock(&c->lock); + } + + pthread_mutex_unlock(&clients_mutex); +} diff --git a/src/cjr.sml b/src/cjr.sml index 688326e4..948b345f 100644 --- a/src/cjr.sml +++ b/src/cjr.sml @@ -106,7 +106,7 @@ datatype decl' = | DTable of string * (string * typ) list | DSequence of string - | DDatabase of string + | DDatabase of string * int | DPreparedStatements of (string * int) list | DJavaScript of string diff --git a/src/cjr_print.sml b/src/cjr_print.sml index 351180b7..21e53a51 100644 --- a/src/cjr_print.sml +++ b/src/cjr_print.sml @@ -1937,119 +1937,128 @@ fun p_decl env (dAll as (d, _) : decl) = string x, string " */", newline] - | DDatabase s => box [string "static void uw_db_validate(uw_context);", - newline, - string "static void uw_db_prepare(uw_context);", - newline, - newline, - string "void uw_db_init(uw_context ctx) {", - newline, - string "PGconn *conn = PQconnectdb(\"", - string (String.toString s), - string "\");", - newline, - string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ", - string "\"libpq can't allocate a connection.\");", - newline, - string "if (PQstatus(conn) != CONNECTION_OK) {", - newline, - box [string "char msg[1024];", + | DDatabase (s, n) => box [string "static void uw_db_validate(uw_context);", newline, - string "strncpy(msg, PQerrorMessage(conn), 1024);", + string "static void uw_db_prepare(uw_context);", newline, - string "msg[1023] = 0;", newline, - string "PQfinish(conn);", + string "void uw_db_init(uw_context ctx) {", newline, - string "uw_error(ctx, BOUNDED_RETRY, ", - string "\"Connection to Postgres server failed: %s\", msg);"], - newline, - string "}", - newline, - string "uw_set_db(ctx, conn);", - newline, - string "uw_db_validate(ctx);", - newline, - string "uw_db_prepare(ctx);", - newline, - string "}", - newline, - newline, - string "void uw_db_close(uw_context ctx) {", - newline, - string "PQfinish(uw_get_db(ctx));", - newline, - string "}", - newline, - newline, - - string "int uw_db_begin(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "PGconn *conn = PQconnectdb(\"", + string (String.toString s), + string "\");", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline, - newline, - - string "int uw_db_commit(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"COMMIT\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ", + string "\"libpq can't allocate a connection.\");", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline, - newline, - - string "int uw_db_rollback(uw_context ctx) {", - newline, - string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexec(conn, \"ROLLBACK\");", - newline, - newline, - string "if (res == NULL) return 1;", - newline, - newline, - string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", - box [string "PQclear(res);", + string "if (PQstatus(conn) != CONNECTION_OK) {", newline, - string "return 1;", - newline], - string "}", - newline, - string "return 0;", - newline, - string "}", - newline] + box [string "char msg[1024];", + newline, + string "strncpy(msg, PQerrorMessage(conn), 1024);", + newline, + string "msg[1023] = 0;", + newline, + string "PQfinish(conn);", + newline, + string "uw_error(ctx, BOUNDED_RETRY, ", + string "\"Connection to Postgres server failed: %s\", msg);"], + newline, + string "}", + newline, + string "uw_set_db(ctx, conn);", + newline, + string "uw_db_validate(ctx);", + newline, + string "uw_db_prepare(ctx);", + newline, + string "}", + newline, + newline, + string "void uw_db_close(uw_context ctx) {", + newline, + string "PQfinish(uw_get_db(ctx));", + newline, + string "}", + newline, + newline, + + string "int uw_db_begin(uw_context ctx) {", + newline, + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, + + string "int uw_db_commit(uw_context ctx) {", + newline, + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"COMMIT\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, + + string "int uw_db_rollback(uw_context ctx) {", + newline, + string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res = PQexec(conn, \"ROLLBACK\");", + newline, + newline, + string "if (res == NULL) return 1;", + newline, + newline, + string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", + box [string "PQclear(res);", + newline, + string "return 1;", + newline], + string "}", + newline, + string "return 0;", + newline, + string "}", + newline, + newline, + + string "void uw_expunger(uw_context ctx, uw_Basis_client cli) {", + newline, + box [p_enamed env n, + string "(ctx, cli);", + newline], + string "}", + newline] | DPreparedStatements [] => box [string "static void uw_db_prepare(uw_context ctx) {", diff --git a/src/mono.sml b/src/mono.sml index 3aa65b6a..95c65fc9 100644 --- a/src/mono.sml +++ b/src/mono.sml @@ -122,7 +122,7 @@ datatype decl' = | DTable of string * (string * typ) list | DSequence of string - | DDatabase of string + | DDatabase of string * int | DJavaScript of string diff --git a/src/mono_print.sml b/src/mono_print.sml index cbe90371..23b32a72 100644 --- a/src/mono_print.sml +++ b/src/mono_print.sml @@ -413,9 +413,13 @@ fun p_decl env (dAll as (d, _) : decl) = | DSequence s => box [string "(* SQL sequence ", string s, string "*)"] - | DDatabase s => box [string "database", - space, - string s] + | DDatabase (s, n) => box [string "database", + space, + string s, + space, + string "(", + p_enamed env n, + string ")"] | DJavaScript s => box [string "JavaScript(", string s, string ")"] diff --git a/src/mono_shake.sml b/src/mono_shake.sml index 4fd3caeb..475c4895 100644 --- a/src/mono_shake.sml +++ b/src/mono_shake.sml @@ -45,6 +45,7 @@ fun shake file = let val page_es = List.foldl (fn ((DExport (_, _, n, _, _), _), page_es) => n :: page_es + | ((DDatabase (_, n), _), page_es) => n :: page_es | (_, page_es) => page_es) [] file val (cdef, edef) = foldl (fn ((DDatatype (_, n, xncs), _), (cdef, edef)) => diff --git a/src/monoize.sml b/src/monoize.sml index 5701cc0c..50678be4 100644 --- a/src/monoize.sml +++ b/src/monoize.sml @@ -165,8 +165,6 @@ fun monoType env = | L.CApp ((L.CFfi ("Basis", "sql_injectable_prim"), _), t) => (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) - | L.CApp ((L.CFfi ("Basis", "sql_injectable_nullable"), _), t) => - (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) | L.CApp ((L.CFfi ("Basis", "sql_injectable"), _), t) => (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc) | L.CApp ((L.CApp ((L.CFfi ("Basis", "sql_unary"), _), _), _), _) => @@ -248,6 +246,8 @@ structure Fm :> sig val lookup : t -> foo_kind -> int -> (int -> t -> L'.decl * t) -> t * int val enter : t -> t val decls : t -> L'.decl list + + val freshName : t -> int * t end = struct structure M = BinaryMapFn(struct @@ -274,6 +274,7 @@ fun empty count = { } fun enter ({count, map, ...} : t) = {count = count, map = map, decls = []} +fun freshName {count, map, decls} = (count, {count = count + 1, map = map, decls = decls}) fun decls ({decls, ...} : t) = decls fun lookup (t as {count, map, decls}) k n thunk = @@ -1455,26 +1456,6 @@ fun monoExp (env, st, fm) (all as (e, loc)) = result = s}), loc)), loc)), loc), fm) end - | L.ECApp ((L.EFfi ("Basis", "sql_nullable"), _), t) => - let - val t = monoType env t - val s = (L'.TFfi ("Basis", "string"), loc) - in - ((L'.EAbs ("f", - (L'.TFun (t, s), loc), - (L'.TFun ((L'.TOption t, loc), s), loc), - (L'.EAbs ("x", - (L'.TOption t, loc), - s, - (L'.ECase ((L'.ERel 0, loc), - [((L'.PNone t, loc), - (L'.EPrim (Prim.String "NULL"), loc)), - ((L'.PSome (t, (L'.PVar ("y", t), loc)), loc), - (L'.EApp ((L'.ERel 2, loc), (L'.ERel 0, loc)), loc))], - {disc = (L'.TOption t, loc), - result = s}), loc)), loc)), loc), - fm) - end | L.ECApp ((L.EFfi ("Basis", "sql_subset"), _), _) => ((L'.ERecord [], loc), fm) @@ -2464,7 +2445,7 @@ fun monoDecl (env, fm) (all as (d, loc)) = [(L'.DSequence s, loc), (L'.DVal (x, n, t', e, s), loc)]) end - | L.DDatabase s => SOME (env, fm, [(L'.DDatabase s, loc)]) + | L.DDatabase _ => NONE | L.DCookie (x, n, t, s) => let val t = (L.CFfi ("Basis", "string"), loc) @@ -2477,7 +2458,9 @@ fun monoDecl (env, fm) (all as (d, loc)) = end end -fun monoize env ds = +datatype expungable = Client | Channel + +fun monoize env file = let val p = !urlPrefix val () = @@ -2488,14 +2471,100 @@ fun monoize env ds = else () + val loc = E.dummySpan + val client = (L'.TFfi ("Basis", "client"), loc) + val unit = (L'.TRecord [], loc) + fun expunger () = + let + val target = (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc) + + fun doTable (tab, xts, e) = + case xts of + L.CRecord (_, xts) => + let + val (nullable, notNullable) = + foldl (fn ((x, t), st as (nullable, notNullable)) => + case #1 x of + L.CName x => + (case #1 t of + L.CFfi ("Basis", "client") => + (nullable, (x, Client) :: notNullable) + | L.CApp ((L.CFfi ("Basis", "option"), _), + (L.CFfi ("Basis", "client"), _)) => + ((x, Client) :: nullable, notNullable) + | L.CApp ((L.CFfi ("Basis", "channel"), _), _) => + (nullable, (x, Channel) :: notNullable) + | L.CApp ((L.CFfi ("Basis", "option"), _), + (L.CApp ((L.CFfi ("Basis", "channel"), _), _), _)) => + ((x, Channel) :: nullable, notNullable) + | _ => st) + | _ => st) ([], []) xts + + val e = + case notNullable of + [] => e + | eb :: ebs => + let + fun cond (x, v) = + (L'.EStrcat ((L'.EPrim (Prim.String ("uw_" ^ x + ^ (case v of + Client => "" + | Channel => " >> 32") + ^ " = ")), loc), + target), loc) + in + (L'.ESeq ( + (L'.EDml (foldl + (fn (eb, s) => + (L'.EStrcat (s, + (L'.EStrcat ((L'.EPrim (Prim.String " AND "), + loc), + cond eb), loc)), loc)) + (L'.EStrcat ((L'.EPrim (Prim.String ("DELETE FROM uw_" + ^ tab + ^ " WHERE ")), loc), + cond eb), loc) + ebs), loc), + e), loc) + end + in + e + end + | _ => e + + val e = (L'.ERecord [], loc) + in + foldl (fn ((d, _), e) => + case d of + L.DTable (_, _, xts, tab) => doTable (tab, #1 xts, e) + | _ => e) e file + end + val (_, _, ds) = List.foldl (fn (d, (env, fm, ds)) => - case monoDecl (env, fm) d of - NONE => (env, fm, ds) - | SOME (env, fm, ds') => - (env, - Fm.enter fm, - ds' @ Fm.decls fm @ ds)) - (env, Fm.empty (CoreUtil.File.maxName ds + 1), []) ds + case #1 d of + L.DDatabase s => + let + val (n, fm) = Fm.freshName fm + + + val d = L'.DVal ("expunger", + n, + (L'.TFun (client, unit), loc), + (L'.EAbs ("cli", client, unit, expunger ()), loc), + "expunger") + in + (env, Fm.enter fm, (L'.DDatabase (s, n), loc) + :: (d, loc) + :: ds) + end + | _ => + case monoDecl (env, fm) d of + NONE => (env, fm, ds) + | SOME (env, fm, ds') => + (env, + Fm.enter fm, + ds' @ Fm.decls fm @ ds)) + (env, Fm.empty (CoreUtil.File.maxName file + 1), []) file in rev ds end |