From ecbb6cad02cdce70c7dcc612b446d35bac70652b Mon Sep 17 00:00:00 2001 From: Adam Chlipala Date: Tue, 24 Mar 2009 14:44:45 -0400 Subject: Transactionalize channel operations --- include/urweb.h | 2 + lib/js/urweb.js | 2 +- src/c/driver.c | 32 +------- src/c/urweb.c | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 209 insertions(+), 51 deletions(-) diff --git a/include/urweb.h b/include/urweb.h index e508c24e..6a9a48a6 100644 --- a/include/urweb.h +++ b/include/urweb.h @@ -22,6 +22,8 @@ void uw_reset_keep_error_message(uw_context); failure_kind uw_begin_init(uw_context); void uw_set_headers(uw_context, char *headers); failure_kind uw_begin(uw_context, char *path); +void uw_commit(uw_context); +int uw_rollback(uw_context); __attribute__((noreturn)) void uw_error(uw_context, failure_kind, const char *fmt, ...); char *uw_error_message(uw_context); diff --git a/lib/js/urweb.js b/lib/js/urweb.js index 6cb5c60a..1f6ecb3d 100644 --- a/lib/js/urweb.js +++ b/lib/js/urweb.js @@ -246,7 +246,7 @@ function listener() { if (isok) { var lines = xhr.responseText.split("\n"); if (lines.length < 2) - whine("Empty message from remote server"); + throw "Empty message from remote server"; for (var i = 0; i+1 < lines.length; i += 2) { var chn = lines[i]; diff --git a/src/c/driver.c b/src/c/driver.c index fe8d3aa5..4fa5db2f 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -53,12 +53,8 @@ static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; #define MAX_RETRIES 5 -int uw_db_begin(uw_context); -int uw_db_commit(uw_context); -int uw_db_rollback(uw_context); - static int try_rollback(uw_context ctx) { - int r = uw_db_rollback(ctx); + int r = uw_rollback(ctx); if (r) { printf("Error running SQL ROLLBACK\n"); @@ -206,36 +202,12 @@ static void *worker(void *data) { printf("Serving URI %s....\n", path); while (1) { - if (uw_db_begin(ctx)) { - printf("Error running SQL BEGIN\n"); - if (retries_left) - --retries_left; - else { - fk = FATAL; - uw_reset(ctx); - uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); - uw_write_header(ctx, "Content-type: text/plain\r\n\r\n"); - uw_write(ctx, "Error running SQL BEGIN\n"); - - break; - } - } - uw_write_header(ctx, "HTTP/1.1 200 OK\r\n"); strcpy(path_copy, path); fk = uw_begin(ctx, path_copy); if (fk == SUCCESS) { - if (uw_db_commit(ctx)) { - fk = FATAL; - - printf("Error running SQL COMMIT\n"); - uw_reset(ctx); - uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r"); - uw_write_header(ctx, "Content-type: text/plain\r\n"); - uw_write(ctx, "Error running SQL COMMIT\n"); - } - + uw_commit(ctx); break; } else if (fk == BOUNDED_RETRY) { if (retries_left) { diff --git a/src/c/urweb.c b/src/c/urweb.c index a0bbe575..39a3a57c 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -98,6 +98,7 @@ typedef struct client { buf msgs; int sock; time_t last_contact; + unsigned refcount; } used; } data; } client; @@ -129,6 +130,7 @@ static client *uw_new_client() { c->data.used.pass = rand(); c->data.used.sock = -1; c->data.used.last_contact = time(NULL); + c->data.used.refcount = 0; buf_init(&c->data.used.msgs, 0); pthread_mutex_unlock(&clients_mutex); @@ -154,11 +156,20 @@ static client *uw_find_client(size_t id) { pthread_mutex_unlock(&clients_mutex); return NULL; } - + + pthread_mutex_lock(&c->data.used.lock); + ++c->data.used.refcount; + pthread_mutex_unlock(&c->data.used.lock); pthread_mutex_unlock(&clients_mutex); return c; } +static void uw_release_client(client *c) { + pthread_mutex_lock(&c->data.used.lock); + --c->data.used.refcount; + pthread_mutex_unlock(&c->data.used.lock); +} + void uw_client_connect(size_t id, int pass, int sock) { client *c = uw_find_client(id); @@ -168,6 +179,8 @@ void uw_client_connect(size_t id, int pass, int sock) { return; } + uw_release_client(c); + pthread_mutex_lock(&c->data.used.lock); if (pass != c->data.used.pass) { @@ -219,7 +232,8 @@ void uw_prune_clients(time_t timeout) { pthread_mutex_lock(&clients_mutex); for (i = 0; i < n_clients; ++i) { - if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff) + if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff + && clients[i]->data.used.refcount == 0) uw_free_client(clients[i]); } @@ -242,6 +256,7 @@ typedef struct channel { struct { pthread_mutex_t lock; client_list *clients; + unsigned refcount; } used; } data; } channel; @@ -271,25 +286,53 @@ static channel *uw_new_channel() { ch->mode = USED; pthread_mutex_init(&ch->data.used.lock, NULL); ch->data.used.clients = NULL; + ch->data.used.refcount = 0; pthread_mutex_unlock(&channels_mutex); return ch; } +static void uw_free_channel(channel *ch) { + if (ch->mode == USED) { + client_list *cs; + + for (cs = ch->data.used.clients; cs; ) { + client_list *tmp = cs->next; + free(cs); + cs = tmp; + } + pthread_mutex_destroy(&ch->data.used.lock); + ch->mode = UNUSED; + ch->data.next = channels_free; + channels_free = ch; + } +} + static channel *uw_find_channel(size_t id) { channel *ch = NULL; pthread_mutex_lock(&channels_mutex); - if (id < n_channels && channels[id]->mode == USED) + if (id < n_channels && channels[id]->mode == USED) { ch = channels[id]; + pthread_mutex_lock(&ch->data.used.lock); + ++ch->data.used.refcount; + pthread_mutex_unlock(&ch->data.used.lock); + } + pthread_mutex_unlock(&channels_mutex); return ch; } +static void uw_release_channel(channel *ch) { + pthread_mutex_lock(&ch->data.used.lock); + ++ch->data.used.refcount; + pthread_mutex_unlock(&ch->data.used.lock); +} + static void uw_subscribe(channel *ch, client *c) { client_list *cs = malloc(sizeof(client_list)); @@ -342,7 +385,6 @@ static void uw_channel_send(channel *ch, const char *msg) { pthread_mutex_lock(&c->data.used.lock); if (c->data.used.sock != -1) { - printf("Immediate send\n"); uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1); uw_really_send(c->data.used.sock, pre, preLen); uw_really_send(c->data.used.sock, msg, len); @@ -350,7 +392,6 @@ static void uw_channel_send(channel *ch, const char *msg) { close(c->data.used.sock); c->data.used.sock = -1; } else { - printf("Delayed send\n"); buf_append(&c->data.used.msgs, pre, preLen); buf_append(&c->data.used.msgs, msg, len); buf_append(&c->data.used.msgs, "\n", 1); @@ -386,6 +427,17 @@ typedef struct { void *arg; } cleanup; +typedef struct { + usage mode; + channel *ch; + enum { OLD, NEW } newness; + + size_t n_subscribed; + client **subscribed; + + buf msgs; +} channel_delta; + struct uw_context { char *headers, *headers_end; @@ -404,6 +456,9 @@ struct uw_context { const char *script_header, *url_prefix; + size_t n_deltas; + channel_delta *deltas; + char error_message[ERROR_BUF_LEN]; }; @@ -435,6 +490,9 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si ctx->source_count = 0; + ctx->n_deltas = 0; + ctx->deltas = malloc(0); + return ctx; } @@ -447,12 +505,20 @@ void *uw_get_db(uw_context ctx) { } void uw_free(uw_context ctx) { + size_t i; + buf_free(&ctx->outHeaders); buf_free(&ctx->script); buf_free(&ctx->page); buf_free(&ctx->heap); free(ctx->inputs); free(ctx->cleanup); + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + free(ctx->deltas[i].subscribed); + buf_free(&ctx->deltas[i].msgs); + } + free(ctx); } @@ -465,6 +531,8 @@ void uw_reset_keep_error_message(uw_context ctx) { ctx->regions = NULL; ctx->cleanup_front = ctx->cleanup; ctx->source_count = 0; + if (ctx->n_deltas > 0) + ctx->deltas[0].mode = UNUSED; } void uw_reset_keep_request(uw_context ctx) { @@ -506,14 +574,7 @@ void uw_set_headers(uw_context ctx, char *headers) { ctx->headers_end = s; } -failure_kind uw_begin(uw_context ctx, char *path) { - int r = setjmp(ctx->jmp_buf); - - if (r == 0) - uw_handle(ctx, path); - - return r; -} +int uw_db_begin(uw_context); __attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) { cleanup *cl; @@ -548,6 +609,18 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) { ++ctx->cleanup_front; } +failure_kind uw_begin(uw_context ctx, char *path) { + int r = setjmp(ctx->jmp_buf); + + if (r == 0) { + if (uw_db_begin(ctx)) + uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); + uw_handle(ctx, path); + } + + return r; +} + void uw_pop_cleanup(uw_context ctx) { if (ctx->cleanup_front == ctx->cleanup) uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack"); @@ -1728,8 +1801,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str return uw_unit_v; } +static channel_delta *allocate_delta(uw_context ctx, channel *ch) { + size_t i; + channel_delta *cd; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i); + + if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch) + return &ctx->deltas[i]; + + if (i < ctx->n_deltas) + cd = &ctx->deltas[i]; + else { + ++ctx->n_deltas; + ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas); + cd = &ctx->deltas[ctx->n_deltas-1]; + } + + cd->mode = USED; + cd->newness = OLD; + cd->ch = ch; + if (cd->n_subscribed > 0) + cd->subscribed[0] = NULL; + buf_reset(&cd->msgs); + return cd; +} + uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) { - return uw_new_channel()->id; + size_t i; + channel *ch = uw_new_channel(); + ++ch->data.used.refcount; + channel_delta *cd = allocate_delta(ctx, ch); + + cd->newness = NEW; + + return ch->id; +} + +static int delta_used(channel_delta *cd) { + return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]); } uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { @@ -1742,13 +1852,33 @@ uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) { int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass")); client *c = uw_find_client(id); - if (c == NULL) + if (c == NULL) { + uw_release_channel(ch); uw_error(ctx, FATAL, "Unknown client ID in subscription request"); - else if (c->data.used.pass != pass) + } else if (c->data.used.pass != pass) { + uw_release_channel(ch); + uw_release_client(c); uw_error(ctx, FATAL, "Wrong client password in subscription request"); - else - uw_subscribe(ch, c); + } else { + size_t i; + channel_delta *cd = allocate_delta(ctx, ch); + + if (delta_used(cd)) + uw_release_channel(ch); + + for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i); + + if (i < cd->n_subscribed) + cd->subscribed[i] = c; + else { + ++cd->n_subscribed; + cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed); + cd->subscribed[cd->n_subscribed-1] = c; + } + } } + + return uw_unit_v; } uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) { @@ -1756,6 +1886,60 @@ uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) if (ch == NULL) uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn); - else - uw_channel_send(ch, msg); + else { + channel_delta *cd = allocate_delta(ctx, ch); + if (delta_used(cd)) + uw_release_channel(ch); + buf_append(&cd->msgs, msg, strlen(msg)); + } + + return uw_unit_v; +} + +int uw_db_commit(uw_context); +int uw_db_rollback(uw_context); + +void uw_commit(uw_context ctx) { + size_t i, j; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + channel *ch = ctx->deltas[i].ch; + + for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { + client *c = ctx->deltas[i].subscribed[j]; + + uw_subscribe(ch, c); + uw_release_client(c); + } + + if (buf_used(&ctx->deltas[i].msgs) > 0) { + uw_channel_send(ch, ctx->deltas[i].msgs.start); + } + + uw_release_channel(ch); + } + + if (uw_db_commit(ctx)) + uw_error(ctx, FATAL, "Error running SQL COMMIT"); +} + +int uw_rollback(uw_context ctx) { + size_t i, j; + + for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) { + channel *ch = ctx->deltas[i].ch; + + for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) { + client *c = ctx->deltas[i].subscribed[j]; + + uw_release_client(c); + } + + if (ctx->deltas[i].newness == NEW) + uw_free_channel(ch); + else + uw_release_channel(ch); + } + + return uw_db_rollback(ctx); } -- cgit v1.2.3