diff options
author | Adam Chlipala <adam@chlipala.net> | 2015-10-17 10:49:25 -0400 |
---|---|---|
committer | Adam Chlipala <adam@chlipala.net> | 2015-10-17 10:49:25 -0400 |
commit | ee354c938959d7ae904fafae99966c3d136070e1 (patch) | |
tree | 864c452ff007b190575e7561bf94f389e833b1f7 | |
parent | 4f4d74c0803d70ea7755ee877997f1e324f6c06b (diff) |
Start of support for surviving database-server restarts, for Postgres
-rw-r--r-- | include/urweb/urweb_cpp.h | 1 | ||||
-rw-r--r-- | src/c/urweb.c | 31 | ||||
-rw-r--r-- | src/mysql.sml | 2 | ||||
-rw-r--r-- | src/postgres.sml | 285 |
4 files changed, 217 insertions, 102 deletions
diff --git a/include/urweb/urweb_cpp.h b/include/urweb/urweb_cpp.h index 39679dd5..b1d2048e 100644 --- a/include/urweb/urweb_cpp.h +++ b/include/urweb/urweb_cpp.h @@ -40,6 +40,7 @@ uw_loggers* uw_get_loggers(struct uw_context *ctx); uw_loggers* uw_get_loggers(struct uw_context *ctx); failure_kind uw_begin(struct uw_context *, char *path); void uw_ensure_transaction(struct uw_context *); +int uw_try_reconnecting_if_at_most_one(struct uw_context *); failure_kind uw_begin_onError(struct uw_context *, char *msg); void uw_login(struct uw_context *); int uw_commit(struct uw_context *); diff --git a/src/c/urweb.c b/src/c/urweb.c index 6d3836f1..4ce469bd 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -797,10 +797,37 @@ failure_kind uw_begin(uw_context ctx, char *path) { return r; } +static void uw_try_reconnecting(uw_context ctx) { + // Hm, error starting transaction. + // Maybe the database server died but has since come back up. + // Let's try starting from scratch. + if (ctx->db) { + ctx->app->db_close(ctx); + ctx->db = NULL; + } + ctx->app->db_init(ctx); + + if (!ctx->db) + uw_error(ctx, FATAL, "Error reopening database connection"); +} + +int uw_try_reconnecting_if_at_most_one(uw_context ctx) { + if (ctx->at_most_one_query) { + uw_try_reconnecting(ctx); + return 1; + } else + return 0; +} + void uw_ensure_transaction(uw_context ctx) { if (!ctx->transaction_started && !ctx->at_most_one_query) { - if (ctx->app->db_begin(ctx, ctx->could_write_db)) - uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + if (!ctx->db || ctx->app->db_begin(ctx, ctx->could_write_db)) { + uw_try_reconnecting(ctx); + + if (ctx->app->db_begin(ctx, ctx->could_write_db)) + uw_error(ctx, FATAL, "Error running SQL BEGIN"); + } + ctx->transaction_started = 1; } } diff --git a/src/mysql.sml b/src/mysql.sml index bb654fee..13ea9fc2 100644 --- a/src/mysql.sml +++ b/src/mysql.sml @@ -546,7 +546,7 @@ fun init {dbstring, prepared = ss, tables, views, sequences} = newline, string "mysql_close(mysql);", newline, - string "uw_error(ctx, BOUNDED_RETRY, ", + string "uw_error(ctx, FATAL, ", string "\"Connection to MySQL server failed: %s\", msg);"], newline, string "}", diff --git a/src/postgres.sml b/src/postgres.sml index 6df0331a..22d55e54 100644 --- a/src/postgres.sml +++ b/src/postgres.sml @@ -1,4 +1,4 @@ -(* Copyright (c) 2008-2010, Adam Chlipala +(* Copyright (c) 2008-2010, 2015, Adam Chlipala * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -520,7 +520,7 @@ fun init {dbstring, prepared = ss, tables, views, sequences} = newline, string "PQfinish(conn);", newline, - string "uw_error(ctx, BOUNDED_RETRY, ", + string "uw_error(ctx, FATAL, ", string "\"Connection to Postgres server failed: %s\", msg);"], newline, string "}", @@ -612,12 +612,24 @@ fun p_getcol {loc, wontLeakStrings, col = i, typ = t} = getter t end -fun queryCommon {loc, query, cols, doCols} = +fun queryCommon {loc, query, cols, doCols, runit} = box [string "int n, i;", newline, newline, - string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating query result.\");", + string "if (res == NULL) {", + box [newline, + string "if (uw_try_reconnecting_if_at_most_one(ctx)) {", + box [newline, + string "conn = uw_get_db(ctx);", + newline, + runit, + newline], + string "}", + newline, + string "if (res == NULL) uw_error(ctx, FATAL, \"Can't allocate query result; database server might be down.\");", + newline], + string "}", newline, newline, @@ -687,12 +699,18 @@ fun queryCommon {loc, query, cols, doCols} = newline] fun query {loc, cols, doCols} = - box [string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);", - newline, - newline, - queryCommon {loc = loc, cols = cols, doCols = doCols, query = string "query"}] + let + val runit = string "res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);" + in + box [string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res;", + newline, + runit, + newline, + newline, + queryCommon {loc = loc, cols = cols, doCols = doCols, query = string "query", runit = runit}] + end fun p_ensql t e = case t of @@ -756,33 +774,52 @@ fun makeParams inputs = newline] fun queryPrepared {loc, id, query, inputs, cols, doCols, nested = _} = - box [string "PGconn *conn = uw_get_db(ctx);", - newline, - - makeParams inputs, - - newline, - string "PGresult *res = ", - if #persistent (Settings.currentProtocol ()) then - box [string "PQexecPrepared(conn, \"uw", - string (Int.toString id), - string "\", ", - string (Int.toString (length inputs)), - string ", paramValues, paramLengths, paramFormats, 0);"] - else - box [string "PQexecParams(conn, \"", - string (Prim.toCString query), - string "\", ", - string (Int.toString (length inputs)), - string ", NULL, paramValues, paramLengths, paramFormats, 0);"], - newline, - newline, - queryCommon {loc = loc, cols = cols, doCols = doCols, query = box [string "\"", - string (Prim.toCString query), - string "\""]}] + let + val runit = + box [string "res = ", + if #persistent (Settings.currentProtocol ()) then + box [string "PQexecPrepared(conn, \"uw", + string (Int.toString id), + string "\", ", + string (Int.toString (length inputs)), + string ", paramValues, paramLengths, paramFormats, 0);"] + else + box [string "PQexecParams(conn, \"", + string (Prim.toCString query), + string "\", ", + string (Int.toString (length inputs)), + string ", NULL, paramValues, paramLengths, paramFormats, 0);"]] + in + box [string "PGconn *conn = uw_get_db(ctx);", + newline, + + makeParams inputs, + + newline, + string "PGresult *res;", + runit, + newline, + newline, + queryCommon {loc = loc, cols = cols, doCols = doCols, query = box [string "\"", + string (Prim.toCString query), + string "\""], + runit = runit}] + end -fun dmlCommon {loc, dml, mode} = - box [string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML result.\");", +fun dmlCommon {loc, dml, mode, runit} = + box [string "if (res == NULL) {", + box [newline, + string "if (uw_try_reconnecting_if_at_most_one(ctx)) {", + box [newline, + string "conn = uw_get_db(ctx);", + newline, + runit, + newline], + string "}", + newline, + string "if (res == NULL) uw_error(ctx, FATAL, \"Can't allocate DML result; database server might be down.\");", + newline], + string "}", newline, newline, @@ -818,7 +855,11 @@ fun dmlCommon {loc, dml, mode} = string "res = PQexec(conn, \"ROLLBACK TO s\");", newline, - string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML result.\");", + string "if (res == NULL) {", + box [newline, + string "uw_error(ctx, FATAL, \"Can't allocate DML ROLLBACK result; database server might be down.\");", + newline], + string "}", newline, newline, @@ -851,7 +892,7 @@ fun dmlCommon {loc, dml, mode} = newline, string "res = PQexec(conn, \"RELEASE s\");", newline, - string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML result.\");", + string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML RELEASE result.\");", newline, newline, @@ -877,7 +918,7 @@ fun makeSavepoint mode = Error => box [] | None => box [string "res = PQexec(conn, \"SAVEPOINT s\");", newline, - string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML result.\");", + string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating DML SAVEPOINT result.\");", newline, newline, string "if (PQresultStatus(res) != PGRES_COMMAND_OK) {", @@ -893,52 +934,71 @@ fun makeSavepoint mode = newline] fun dml (loc, mode) = - box [string "PGconn *conn = uw_get_db(ctx);", - newline, - string "PGresult *res;", - newline, + let + val runit = string "res = PQexecParams(conn, dml, 0, NULL, NULL, NULL, NULL, 0);" + in + box [string "PGconn *conn = uw_get_db(ctx);", + newline, + string "PGresult *res;", + newline, - makeSavepoint mode, + makeSavepoint mode, - string "res = PQexecParams(conn, dml, 0, NULL, NULL, NULL, NULL, 0);", - newline, - newline, - dmlCommon {loc = loc, dml = string "dml", mode = mode}] + runit, + newline, + newline, + dmlCommon {loc = loc, dml = string "dml", mode = mode, runit = runit}] + end fun dmlPrepared {loc, id, dml, inputs, mode} = - box [string "PGconn *conn = uw_get_db(ctx);", - newline, + let + val runit = + box [string "res = ", + if #persistent (Settings.currentProtocol ()) then + box [string "PQexecPrepared(conn, \"uw", + string (Int.toString id), + string "\", ", + string (Int.toString (length inputs)), + string ", paramValues, paramLengths, paramFormats, 0);"] + else + box [string "PQexecParams(conn, \"", + string (Prim.toCString dml), + string "\", ", + string (Int.toString (length inputs)), + string ", NULL, paramValues, paramLengths, paramFormats, 0);"]] + in + box [string "PGconn *conn = uw_get_db(ctx);", + newline, - makeParams inputs, + makeParams inputs, - newline, - string "PGresult *res;", - newline, - newline, + newline, + string "PGresult *res;", + newline, + newline, - makeSavepoint mode, + makeSavepoint mode, - string "res = ", - if #persistent (Settings.currentProtocol ()) then - box [string "PQexecPrepared(conn, \"uw", - string (Int.toString id), - string "\", ", - string (Int.toString (length inputs)), - string ", paramValues, paramLengths, paramFormats, 0);"] - else - box [string "PQexecParams(conn, \"", - string (Prim.toCString dml), - string "\", ", - string (Int.toString (length inputs)), - string ", NULL, paramValues, paramLengths, paramFormats, 0);"], - newline, - newline, - dmlCommon {loc = loc, dml = box [string "\"", - string (Prim.toCString dml), - string "\""], mode = mode}] + runit, + newline, + newline, + dmlCommon {loc = loc, dml = box [string "\"", + string (Prim.toCString dml), + string "\""], mode = mode, runit = runit}] + end -fun nextvalCommon {loc, query} = - box [string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating nextval result.\");", +fun nextvalCommon {loc, query, runit} = + box [string "if (res == NULL) {", + box [newline, + string "if (uw_try_reconnecting_if_at_most_one(ctx))", + newline, + string "conn = uw_get_db(ctx);", + newline, + runit, + newline, + string "uw_error(ctx, FATAL, \"Out of memory allocating nextval result.\");", + newline], + string "}", newline, newline, @@ -987,6 +1047,8 @@ fun nextval {loc, seqE, seqName} = | _ => box [string "uw_Basis_strcat(ctx, \"SELECT NEXTVAL('\", uw_Basis_strcat(ctx, ", seqE, string ", \"')\"))"] + + val runit = string "res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);" in box [string "char *query = ", query, @@ -994,33 +1056,53 @@ fun nextval {loc, seqE, seqName} = newline, string "PGconn *conn = uw_get_db(ctx);", newline, - string "PGresult *res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);", + string "PGresult *res;", + newline, + runit, newline, newline, - nextvalCommon {loc = loc, query = string "query"}] + nextvalCommon {loc = loc, query = string "query", runit = runit}] end fun nextvalPrepared {loc, id, query} = - box [string "PGconn *conn = uw_get_db(ctx);", - newline, - newline, - string "PGresult *res = ", - if #persistent (Settings.currentProtocol ()) then - box [string "PQexecPrepared(conn, \"uw", - string (Int.toString id), - string "\", 0, NULL, NULL, NULL, 0);"] - else - box [string "PQexecParams(conn, \"", - string (Prim.toCString query), - string "\", 0, NULL, NULL, NULL, NULL, 0);"], - newline, - newline, - nextvalCommon {loc = loc, query = box [string "\"", - string (Prim.toCString query), - string "\""]}] + let + val runit = + box [string "res = ", + if #persistent (Settings.currentProtocol ()) then + box [string "PQexecPrepared(conn, \"uw", + string (Int.toString id), + string "\", 0, NULL, NULL, NULL, 0);"] + else + box [string "PQexecParams(conn, \"", + string (Prim.toCString query), + string "\", 0, NULL, NULL, NULL, NULL, 0);"]] + in + box [string "PGconn *conn = uw_get_db(ctx);", + newline, + newline, -fun setvalCommon {loc, query} = - box [string "if (res == NULL) uw_error(ctx, FATAL, \"Out of memory allocating setval result.\");", + string "PGresult *res;", + newline, + runit, + newline, + newline, + nextvalCommon {loc = loc, query = box [string "\"", + string (Prim.toCString query), + string "\""], runit = runit}] + end + +fun setvalCommon {loc, query, runit} = + box [string "if (res == NULL) {", + box [newline, + string "if (uw_try_reconnecting_if_at_most_one(ctx))", + newline, + string "conn = uw_get_db(ctx);", + newline, + runit, + newline, + string "uw_error(ctx, FATAL, \"Out of memory allocating setval result.\");", + newline], + string "}", newline, newline, @@ -1048,6 +1130,8 @@ fun setval {loc, seqE, count} = string ", uw_Basis_strcat(ctx, \"', \", uw_Basis_strcat(ctx, uw_Basis_sqlifyInt(ctx, ", count, string "), \")\"))))"] + + val runit = string "res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);" in box [string "char *query = ", query, @@ -1055,10 +1139,13 @@ fun setval {loc, seqE, count} = newline, string "PGconn *conn = uw_get_db(ctx);", newline, - string "PGresult *res = PQexecParams(conn, query, 0, NULL, NULL, NULL, NULL, 0);", + + string "PGresult *res;", + newline, + runit, newline, newline, - setvalCommon {loc = loc, query = string "query"}] + setvalCommon {loc = loc, query = string "query", runit = runit}] end fun sqlifyString s = "E'" ^ String.translate (fn #"'" => "\\'" |