aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adamc@hcoop.net>2009-03-29 13:30:01 -0400
committerGravatar Adam Chlipala <adamc@hcoop.net>2009-03-29 13:30:01 -0400
commit5430dbfa3f1c7c0adaabc230e86ffd90e6f923da (patch)
tree3bd72d9c87173e9ba21a556e5b90841f36e24651
parent6217967a353bc9d97ae45c2af495b653a47e2481 (diff)
Expunging non-nullable rows
-rw-r--r--include/urweb.h2
-rw-r--r--lib/ur/basis.urs7
-rw-r--r--lib/ur/top.ur20
-rw-r--r--src/c/driver.c5
-rw-r--r--src/c/urweb.c82
-rw-r--r--src/cjr.sml2
-rw-r--r--src/cjr_print.sml221
-rw-r--r--src/mono.sml2
-rw-r--r--src/mono_print.sml10
-rw-r--r--src/mono_shake.sml1
-rw-r--r--src/monoize.sml131
11 files changed, 289 insertions, 194 deletions
diff --git a/include/urweb.h b/include/urweb.h
index a12952a4..e3d3f71f 100644
--- a/include/urweb.h
+++ b/include/urweb.h
@@ -9,7 +9,7 @@ extern uw_unit uw_unit_v;
void uw_global_init(void);
void uw_client_connect(unsigned id, int pass, int sock);
-void uw_prune_clients(time_t timeout);
+void uw_prune_clients(uw_context);
uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len);
void uw_set_db(uw_context, void*);
diff --git a/lib/ur/basis.urs b/lib/ur/basis.urs
index d6b27852..586c5ef9 100644
--- a/lib/ur/basis.urs
+++ b/lib/ur/basis.urs
@@ -207,15 +207,12 @@ val sql_int : sql_injectable_prim int
val sql_float : sql_injectable_prim float
val sql_string : sql_injectable_prim string
val sql_time : sql_injectable_prim time
-
-class sql_injectable_nullable
-val sql_channel : t ::: Type -> sql_injectable_nullable (channel t)
-val sql_client : sql_injectable_nullable client
+val sql_channel : t ::: Type -> sql_injectable_prim (channel t)
+val sql_client : sql_injectable_prim client
class sql_injectable
val sql_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable t
val sql_option_prim : t ::: Type -> sql_injectable_prim t -> sql_injectable (option t)
-val sql_nullable : t ::: Type -> sql_injectable_nullable t -> sql_injectable (option t)
val sql_inject : tables ::: {{Type}} -> agg ::: {{Type}} -> exps ::: {Type}
-> t ::: Type
diff --git a/lib/ur/top.ur b/lib/ur/top.ur
index 5465da3d..154e88e9 100644
--- a/lib/ur/top.ur
+++ b/lib/ur/top.ur
@@ -200,7 +200,7 @@ fun eqNullable' (tables ::: {{Type}}) (agg ::: {{Type}}) (exps ::: {Type})
functor Broadcast(M : sig type t end) = struct
sequence s
- table t : {Id : int, Client : option client, Channel : option (channel M.t)}
+ table t : {Id : int, Client : client, Channel : channel M.t}
type topic = int
@@ -208,27 +208,17 @@ functor Broadcast(M : sig type t end) = struct
val create = nextval s
- val cleanup =
- dml (DELETE FROM t WHERE Client IS NULL)
-
fun subscribe id =
cli <- self;
- cleanup;
- ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[Some cli]});
+ ro <- oneOrNoRows (SELECT t.Channel FROM t WHERE t.Id = {[id]} AND t.Client = {[cli]});
case ro of
None =>
ch <- channel;
- dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[Some cli]}, {[Some ch]}));
+ dml (INSERT INTO t (Id, Client, Channel) VALUES ({[id]}, {[cli]}, {[ch]}));
return ch
- | Some r =>
- case r.T.Channel of
- None => error <xml>Broadcast.subscribe: Got null result</xml>
- | Some ch => return ch
+ | Some r => return r.T.Channel
fun send id msg =
- cleanup;
queryI (SELECT t.Channel FROM t WHERE t.Id = {[id]})
- (fn r => case r.T.Channel of
- None => error <xml>Broadcast.send: Got null result</xml>
- | Some ch => Basis.send ch msg)
+ (fn r => Basis.send r.T.Channel msg)
end
diff --git a/src/c/driver.c b/src/c/driver.c
index 1b616636..14d08b57 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -278,8 +278,11 @@ static void *worker(void *data) {
}
static void *client_pruner(void *data) {
+ uw_context ctx = uw_init(0, 0, 0, 0);
+ uw_db_init(ctx);
+
while (1) {
- uw_prune_clients(5);
+ uw_prune_clients(ctx);
sleep(5);
}
}
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 0194c226..8ad50711 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -208,7 +208,7 @@ void uw_client_connect(unsigned id, int pass, int sock) {
}
static void free_client(client *c) {
- printf("Freeing client %d\n", c->id);
+ printf("Freeing client %u\n", c->id);
c->mode = UNUSED;
c->pass = -1;
@@ -217,34 +217,6 @@ static void free_client(client *c) {
clients_free = c;
}
-extern int uw_timeout;
-
-void uw_prune_clients() {
- client *c, *next, *prev = NULL;
- time_t cutoff;
-
- cutoff = time(NULL) - uw_timeout;
-
- pthread_mutex_lock(&clients_mutex);
-
- for (c = clients_used; c; c = next) {
- next = c->next;
- pthread_mutex_lock(&c->lock);
- if (c->last_contact < cutoff) {
- if (prev)
- prev->next = next;
- else
- clients_used = next;
- free_client(c);
- }
- else
- prev = c;
- pthread_mutex_unlock(&c->lock);
- }
-
- pthread_mutex_unlock(&clients_mutex);
-}
-
static uw_Basis_channel new_channel(client *c) {
uw_Basis_channel ch = {c->id, c->n_channels++};
return ch;
@@ -322,7 +294,7 @@ struct uw_context {
char error_message[ERROR_BUF_LEN];
};
-extern int uw_inputs_len;
+extern int uw_inputs_len, uw_timeout;
uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, size_t heap_len) {
uw_context ctx = malloc(sizeof(struct uw_context));
@@ -1888,3 +1860,53 @@ int uw_rollback(uw_context ctx) {
return uw_db_rollback(ctx);
}
+
+
+// "Garbage collection"
+
+void uw_expunger(uw_context ctx, uw_Basis_client cli);
+
+static failure_kind uw_expunge(uw_context ctx, uw_Basis_client cli) {
+ int r = setjmp(ctx->jmp_buf);
+
+ if (r == 0)
+ uw_expunger(ctx, cli);
+
+ return r;
+}
+
+void uw_prune_clients(uw_context ctx) {
+ client *c, *next, *prev = NULL;
+ time_t cutoff;
+
+ cutoff = time(NULL) - uw_timeout;
+
+ pthread_mutex_lock(&clients_mutex);
+
+ for (c = clients_used; c; c = next) {
+ next = c->next;
+ pthread_mutex_lock(&c->lock);
+ if (c->last_contact < cutoff) {
+ failure_kind fk = UNLIMITED_RETRY;
+ if (prev)
+ prev->next = next;
+ else
+ clients_used = next;
+ while (fk == UNLIMITED_RETRY) {
+ uw_reset(ctx);
+ fk = uw_expunge(ctx, c->id);
+ if (fk == SUCCESS) {
+ free_client(c);
+ break;
+ }
+ }
+ if (fk != SUCCESS)
+ printf("Expunge blocked by error: %s\n", uw_error_message(ctx));
+ }
+ else
+ prev = c;
+ pthread_mutex_unlock(&c->lock);
+ }
+
+ pthread_mutex_unlock(&clients_mutex);
+}
diff --git a/src/cjr.sml b/src/cjr.sml
index 688326e4..948b345f 100644
--- a/src/cjr.sml
+++ b/src/cjr.sml
@@ -106,7 +106,7 @@ datatype decl' =
| DTable of string * (string * typ) list
| DSequence of string
- | DDatabase of string
+ | DDatabase of string * int
| DPreparedStatements of (string * int) list
| DJavaScript of string
diff --git a/src/cjr_print.sml b/src/cjr_print.sml
index 351180b7..21e53a51 100644
--- a/src/cjr_print.sml
+++ b/src/cjr_print.sml
@@ -1937,119 +1937,128 @@ fun p_decl env (dAll as (d, _) : decl) =
string x,
string " */",
newline]
- | DDatabase s => box [string "static void uw_db_validate(uw_context);",
- newline,
- string "static void uw_db_prepare(uw_context);",
- newline,
- newline,
- string "void uw_db_init(uw_context ctx) {",
- newline,
- string "PGconn *conn = PQconnectdb(\"",
- string (String.toString s),
- string "\");",
- newline,
- string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ",
- string "\"libpq can't allocate a connection.\");",
- newline,
- string "if (PQstatus(conn) != CONNECTION_OK) {",
- newline,
- box [string "char msg[1024];",
+ | DDatabase (s, n) => box [string "static void uw_db_validate(uw_context);",
newline,
- string "strncpy(msg, PQerrorMessage(conn), 1024);",
+ string "static void uw_db_prepare(uw_context);",
newline,
- string "msg[1023] = 0;",
newline,
- string "PQfinish(conn);",
+ string "void uw_db_init(uw_context ctx) {",
newline,
- string "uw_error(ctx, BOUNDED_RETRY, ",
- string "\"Connection to Postgres server failed: %s\", msg);"],
- newline,
- string "}",
- newline,
- string "uw_set_db(ctx, conn);",
- newline,
- string "uw_db_validate(ctx);",
- newline,
- string "uw_db_prepare(ctx);",
- newline,
- string "}",
- newline,
- newline,
- string "void uw_db_close(uw_context ctx) {",
- newline,
- string "PQfinish(uw_get_db(ctx));",
- newline,
- string "}",
- newline,
- newline,
-
- string "int uw_db_begin(uw_context ctx) {",
- newline,
- string "PGconn *conn = uw_get_db(ctx);",
- newline,
- string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");",
- newline,
- newline,
- string "if (res == NULL) return 1;",
- newline,
- newline,
- string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
- box [string "PQclear(res);",
+ string "PGconn *conn = PQconnectdb(\"",
+ string (String.toString s),
+ string "\");",
newline,
- string "return 1;",
- newline],
- string "}",
- newline,
- string "return 0;",
- newline,
- string "}",
- newline,
- newline,
-
- string "int uw_db_commit(uw_context ctx) {",
- newline,
- string "PGconn *conn = uw_get_db(ctx);",
- newline,
- string "PGresult *res = PQexec(conn, \"COMMIT\");",
- newline,
- newline,
- string "if (res == NULL) return 1;",
- newline,
- newline,
- string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
- box [string "PQclear(res);",
+ string "if (conn == NULL) uw_error(ctx, BOUNDED_RETRY, ",
+ string "\"libpq can't allocate a connection.\");",
newline,
- string "return 1;",
- newline],
- string "}",
- newline,
- string "return 0;",
- newline,
- string "}",
- newline,
- newline,
-
- string "int uw_db_rollback(uw_context ctx) {",
- newline,
- string "PGconn *conn = uw_get_db(ctx);",
- newline,
- string "PGresult *res = PQexec(conn, \"ROLLBACK\");",
- newline,
- newline,
- string "if (res == NULL) return 1;",
- newline,
- newline,
- string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
- box [string "PQclear(res);",
+ string "if (PQstatus(conn) != CONNECTION_OK) {",
newline,
- string "return 1;",
- newline],
- string "}",
- newline,
- string "return 0;",
- newline,
- string "}",
- newline]
+ box [string "char msg[1024];",
+ newline,
+ string "strncpy(msg, PQerrorMessage(conn), 1024);",
+ newline,
+ string "msg[1023] = 0;",
+ newline,
+ string "PQfinish(conn);",
+ newline,
+ string "uw_error(ctx, BOUNDED_RETRY, ",
+ string "\"Connection to Postgres server failed: %s\", msg);"],
+ newline,
+ string "}",
+ newline,
+ string "uw_set_db(ctx, conn);",
+ newline,
+ string "uw_db_validate(ctx);",
+ newline,
+ string "uw_db_prepare(ctx);",
+ newline,
+ string "}",
+ newline,
+ newline,
+ string "void uw_db_close(uw_context ctx) {",
+ newline,
+ string "PQfinish(uw_get_db(ctx));",
+ newline,
+ string "}",
+ newline,
+ newline,
+
+ string "int uw_db_begin(uw_context ctx) {",
+ newline,
+ string "PGconn *conn = uw_get_db(ctx);",
+ newline,
+ string "PGresult *res = PQexec(conn, \"BEGIN ISOLATION LEVEL SERIALIZABLE\");",
+ newline,
+ newline,
+ string "if (res == NULL) return 1;",
+ newline,
+ newline,
+ string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
+ box [string "PQclear(res);",
+ newline,
+ string "return 1;",
+ newline],
+ string "}",
+ newline,
+ string "return 0;",
+ newline,
+ string "}",
+ newline,
+ newline,
+
+ string "int uw_db_commit(uw_context ctx) {",
+ newline,
+ string "PGconn *conn = uw_get_db(ctx);",
+ newline,
+ string "PGresult *res = PQexec(conn, \"COMMIT\");",
+ newline,
+ newline,
+ string "if (res == NULL) return 1;",
+ newline,
+ newline,
+ string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
+ box [string "PQclear(res);",
+ newline,
+ string "return 1;",
+ newline],
+ string "}",
+ newline,
+ string "return 0;",
+ newline,
+ string "}",
+ newline,
+ newline,
+
+ string "int uw_db_rollback(uw_context ctx) {",
+ newline,
+ string "PGconn *conn = uw_get_db(ctx);",
+ newline,
+ string "PGresult *res = PQexec(conn, \"ROLLBACK\");",
+ newline,
+ newline,
+ string "if (res == NULL) return 1;",
+ newline,
+ newline,
+ string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {",
+ box [string "PQclear(res);",
+ newline,
+ string "return 1;",
+ newline],
+ string "}",
+ newline,
+ string "return 0;",
+ newline,
+ string "}",
+ newline,
+ newline,
+
+ string "void uw_expunger(uw_context ctx, uw_Basis_client cli) {",
+ newline,
+ box [p_enamed env n,
+ string "(ctx, cli);",
+ newline],
+ string "}",
+ newline]
| DPreparedStatements [] =>
box [string "static void uw_db_prepare(uw_context ctx) {",
diff --git a/src/mono.sml b/src/mono.sml
index 3aa65b6a..95c65fc9 100644
--- a/src/mono.sml
+++ b/src/mono.sml
@@ -122,7 +122,7 @@ datatype decl' =
| DTable of string * (string * typ) list
| DSequence of string
- | DDatabase of string
+ | DDatabase of string * int
| DJavaScript of string
diff --git a/src/mono_print.sml b/src/mono_print.sml
index cbe90371..23b32a72 100644
--- a/src/mono_print.sml
+++ b/src/mono_print.sml
@@ -413,9 +413,13 @@ fun p_decl env (dAll as (d, _) : decl) =
| DSequence s => box [string "(* SQL sequence ",
string s,
string "*)"]
- | DDatabase s => box [string "database",
- space,
- string s]
+ | DDatabase (s, n) => box [string "database",
+ space,
+ string s,
+ space,
+ string "(",
+ p_enamed env n,
+ string ")"]
| DJavaScript s => box [string "JavaScript(",
string s,
string ")"]
diff --git a/src/mono_shake.sml b/src/mono_shake.sml
index 4fd3caeb..475c4895 100644
--- a/src/mono_shake.sml
+++ b/src/mono_shake.sml
@@ -45,6 +45,7 @@ fun shake file =
let
val page_es = List.foldl
(fn ((DExport (_, _, n, _, _), _), page_es) => n :: page_es
+ | ((DDatabase (_, n), _), page_es) => n :: page_es
| (_, page_es) => page_es) [] file
val (cdef, edef) = foldl (fn ((DDatatype (_, n, xncs), _), (cdef, edef)) =>
diff --git a/src/monoize.sml b/src/monoize.sml
index 5701cc0c..50678be4 100644
--- a/src/monoize.sml
+++ b/src/monoize.sml
@@ -165,8 +165,6 @@ fun monoType env =
| L.CApp ((L.CFfi ("Basis", "sql_injectable_prim"), _), t) =>
(L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc)
- | L.CApp ((L.CFfi ("Basis", "sql_injectable_nullable"), _), t) =>
- (L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc)
| L.CApp ((L.CFfi ("Basis", "sql_injectable"), _), t) =>
(L'.TFun (mt env dtmap t, (L'.TFfi ("Basis", "string"), loc)), loc)
| L.CApp ((L.CApp ((L.CFfi ("Basis", "sql_unary"), _), _), _), _) =>
@@ -248,6 +246,8 @@ structure Fm :> sig
val lookup : t -> foo_kind -> int -> (int -> t -> L'.decl * t) -> t * int
val enter : t -> t
val decls : t -> L'.decl list
+
+ val freshName : t -> int * t
end = struct
structure M = BinaryMapFn(struct
@@ -274,6 +274,7 @@ fun empty count = {
}
fun enter ({count, map, ...} : t) = {count = count, map = map, decls = []}
+fun freshName {count, map, decls} = (count, {count = count + 1, map = map, decls = decls})
fun decls ({decls, ...} : t) = decls
fun lookup (t as {count, map, decls}) k n thunk =
@@ -1455,26 +1456,6 @@ fun monoExp (env, st, fm) (all as (e, loc)) =
result = s}), loc)), loc)), loc),
fm)
end
- | L.ECApp ((L.EFfi ("Basis", "sql_nullable"), _), t) =>
- let
- val t = monoType env t
- val s = (L'.TFfi ("Basis", "string"), loc)
- in
- ((L'.EAbs ("f",
- (L'.TFun (t, s), loc),
- (L'.TFun ((L'.TOption t, loc), s), loc),
- (L'.EAbs ("x",
- (L'.TOption t, loc),
- s,
- (L'.ECase ((L'.ERel 0, loc),
- [((L'.PNone t, loc),
- (L'.EPrim (Prim.String "NULL"), loc)),
- ((L'.PSome (t, (L'.PVar ("y", t), loc)), loc),
- (L'.EApp ((L'.ERel 2, loc), (L'.ERel 0, loc)), loc))],
- {disc = (L'.TOption t, loc),
- result = s}), loc)), loc)), loc),
- fm)
- end
| L.ECApp ((L.EFfi ("Basis", "sql_subset"), _), _) =>
((L'.ERecord [], loc), fm)
@@ -2464,7 +2445,7 @@ fun monoDecl (env, fm) (all as (d, loc)) =
[(L'.DSequence s, loc),
(L'.DVal (x, n, t', e, s), loc)])
end
- | L.DDatabase s => SOME (env, fm, [(L'.DDatabase s, loc)])
+ | L.DDatabase _ => NONE
| L.DCookie (x, n, t, s) =>
let
val t = (L.CFfi ("Basis", "string"), loc)
@@ -2477,7 +2458,9 @@ fun monoDecl (env, fm) (all as (d, loc)) =
end
end
-fun monoize env ds =
+datatype expungable = Client | Channel
+
+fun monoize env file =
let
val p = !urlPrefix
val () =
@@ -2488,14 +2471,100 @@ fun monoize env ds =
else
()
+ val loc = E.dummySpan
+ val client = (L'.TFfi ("Basis", "client"), loc)
+ val unit = (L'.TRecord [], loc)
+ fun expunger () =
+ let
+ val target = (L'.EFfiApp ("Basis", "sqlifyClient", [(L'.ERel 0, loc)]), loc)
+
+ fun doTable (tab, xts, e) =
+ case xts of
+ L.CRecord (_, xts) =>
+ let
+ val (nullable, notNullable) =
+ foldl (fn ((x, t), st as (nullable, notNullable)) =>
+ case #1 x of
+ L.CName x =>
+ (case #1 t of
+ L.CFfi ("Basis", "client") =>
+ (nullable, (x, Client) :: notNullable)
+ | L.CApp ((L.CFfi ("Basis", "option"), _),
+ (L.CFfi ("Basis", "client"), _)) =>
+ ((x, Client) :: nullable, notNullable)
+ | L.CApp ((L.CFfi ("Basis", "channel"), _), _) =>
+ (nullable, (x, Channel) :: notNullable)
+ | L.CApp ((L.CFfi ("Basis", "option"), _),
+ (L.CApp ((L.CFfi ("Basis", "channel"), _), _), _)) =>
+ ((x, Channel) :: nullable, notNullable)
+ | _ => st)
+ | _ => st) ([], []) xts
+
+ val e =
+ case notNullable of
+ [] => e
+ | eb :: ebs =>
+ let
+ fun cond (x, v) =
+ (L'.EStrcat ((L'.EPrim (Prim.String ("uw_" ^ x
+ ^ (case v of
+ Client => ""
+ | Channel => " >> 32")
+ ^ " = ")), loc),
+ target), loc)
+ in
+ (L'.ESeq (
+ (L'.EDml (foldl
+ (fn (eb, s) =>
+ (L'.EStrcat (s,
+ (L'.EStrcat ((L'.EPrim (Prim.String " AND "),
+ loc),
+ cond eb), loc)), loc))
+ (L'.EStrcat ((L'.EPrim (Prim.String ("DELETE FROM uw_"
+ ^ tab
+ ^ " WHERE ")), loc),
+ cond eb), loc)
+ ebs), loc),
+ e), loc)
+ end
+ in
+ e
+ end
+ | _ => e
+
+ val e = (L'.ERecord [], loc)
+ in
+ foldl (fn ((d, _), e) =>
+ case d of
+ L.DTable (_, _, xts, tab) => doTable (tab, #1 xts, e)
+ | _ => e) e file
+ end
+
val (_, _, ds) = List.foldl (fn (d, (env, fm, ds)) =>
- case monoDecl (env, fm) d of
- NONE => (env, fm, ds)
- | SOME (env, fm, ds') =>
- (env,
- Fm.enter fm,
- ds' @ Fm.decls fm @ ds))
- (env, Fm.empty (CoreUtil.File.maxName ds + 1), []) ds
+ case #1 d of
+ L.DDatabase s =>
+ let
+ val (n, fm) = Fm.freshName fm
+
+
+ val d = L'.DVal ("expunger",
+ n,
+ (L'.TFun (client, unit), loc),
+ (L'.EAbs ("cli", client, unit, expunger ()), loc),
+ "expunger")
+ in
+ (env, Fm.enter fm, (L'.DDatabase (s, n), loc)
+ :: (d, loc)
+ :: ds)
+ end
+ | _ =>
+ case monoDecl (env, fm) d of
+ NONE => (env, fm, ds)
+ | SOME (env, fm, ds') =>
+ (env,
+ Fm.enter fm,
+ ds' @ Fm.decls fm @ ds))
+ (env, Fm.empty (CoreUtil.File.maxName file + 1), []) file
in
rev ds
end