aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adamc@hcoop.net>2009-03-24 14:44:45 -0400
committerGravatar Adam Chlipala <adamc@hcoop.net>2009-03-24 14:44:45 -0400
commitdcdab80311789d08cb44a8db311963553f00fd8e (patch)
tree12a2cd34bffd3e5c5f6f7161f851f2b5ce771889 /src
parente2f6b11fd4fc806c5cdf88cf669ed5b2d9e34caf (diff)
Transactionalize channel operations
Diffstat (limited to 'src')
-rw-r--r--src/c/driver.c32
-rw-r--r--src/c/urweb.c224
2 files changed, 206 insertions, 50 deletions
diff --git a/src/c/driver.c b/src/c/driver.c
index fe8d3aa5..4fa5db2f 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -53,12 +53,8 @@ static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
#define MAX_RETRIES 5
-int uw_db_begin(uw_context);
-int uw_db_commit(uw_context);
-int uw_db_rollback(uw_context);
-
static int try_rollback(uw_context ctx) {
- int r = uw_db_rollback(ctx);
+ int r = uw_rollback(ctx);
if (r) {
printf("Error running SQL ROLLBACK\n");
@@ -206,36 +202,12 @@ static void *worker(void *data) {
printf("Serving URI %s....\n", path);
while (1) {
- if (uw_db_begin(ctx)) {
- printf("Error running SQL BEGIN\n");
- if (retries_left)
- --retries_left;
- else {
- fk = FATAL;
- uw_reset(ctx);
- uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
- uw_write_header(ctx, "Content-type: text/plain\r\n\r\n");
- uw_write(ctx, "Error running SQL BEGIN\n");
-
- break;
- }
- }
-
uw_write_header(ctx, "HTTP/1.1 200 OK\r\n");
strcpy(path_copy, path);
fk = uw_begin(ctx, path_copy);
if (fk == SUCCESS) {
- if (uw_db_commit(ctx)) {
- fk = FATAL;
-
- printf("Error running SQL COMMIT\n");
- uw_reset(ctx);
- uw_write_header(ctx, "HTTP/1.1 500 Internal Server Error\n\r");
- uw_write_header(ctx, "Content-type: text/plain\r\n");
- uw_write(ctx, "Error running SQL COMMIT\n");
- }
-
+ uw_commit(ctx);
break;
} else if (fk == BOUNDED_RETRY) {
if (retries_left) {
diff --git a/src/c/urweb.c b/src/c/urweb.c
index a0bbe575..39a3a57c 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -98,6 +98,7 @@ typedef struct client {
buf msgs;
int sock;
time_t last_contact;
+ unsigned refcount;
} used;
} data;
} client;
@@ -129,6 +130,7 @@ static client *uw_new_client() {
c->data.used.pass = rand();
c->data.used.sock = -1;
c->data.used.last_contact = time(NULL);
+ c->data.used.refcount = 0;
buf_init(&c->data.used.msgs, 0);
pthread_mutex_unlock(&clients_mutex);
@@ -154,11 +156,20 @@ static client *uw_find_client(size_t id) {
pthread_mutex_unlock(&clients_mutex);
return NULL;
}
-
+
+ pthread_mutex_lock(&c->data.used.lock);
+ ++c->data.used.refcount;
+ pthread_mutex_unlock(&c->data.used.lock);
pthread_mutex_unlock(&clients_mutex);
return c;
}
+static void uw_release_client(client *c) {
+ pthread_mutex_lock(&c->data.used.lock);
+ --c->data.used.refcount;
+ pthread_mutex_unlock(&c->data.used.lock);
+}
+
void uw_client_connect(size_t id, int pass, int sock) {
client *c = uw_find_client(id);
@@ -168,6 +179,8 @@ void uw_client_connect(size_t id, int pass, int sock) {
return;
}
+ uw_release_client(c);
+
pthread_mutex_lock(&c->data.used.lock);
if (pass != c->data.used.pass) {
@@ -219,7 +232,8 @@ void uw_prune_clients(time_t timeout) {
pthread_mutex_lock(&clients_mutex);
for (i = 0; i < n_clients; ++i) {
- if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff)
+ if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
+ && clients[i]->data.used.refcount == 0)
uw_free_client(clients[i]);
}
@@ -242,6 +256,7 @@ typedef struct channel {
struct {
pthread_mutex_t lock;
client_list *clients;
+ unsigned refcount;
} used;
} data;
} channel;
@@ -271,25 +286,53 @@ static channel *uw_new_channel() {
ch->mode = USED;
pthread_mutex_init(&ch->data.used.lock, NULL);
ch->data.used.clients = NULL;
+ ch->data.used.refcount = 0;
pthread_mutex_unlock(&channels_mutex);
return ch;
}
+static void uw_free_channel(channel *ch) {
+ if (ch->mode == USED) {
+ client_list *cs;
+
+ for (cs = ch->data.used.clients; cs; ) {
+ client_list *tmp = cs->next;
+ free(cs);
+ cs = tmp;
+ }
+ pthread_mutex_destroy(&ch->data.used.lock);
+ ch->mode = UNUSED;
+ ch->data.next = channels_free;
+ channels_free = ch;
+ }
+}
+
static channel *uw_find_channel(size_t id) {
channel *ch = NULL;
pthread_mutex_lock(&channels_mutex);
- if (id < n_channels && channels[id]->mode == USED)
+ if (id < n_channels && channels[id]->mode == USED) {
ch = channels[id];
+ pthread_mutex_lock(&ch->data.used.lock);
+ ++ch->data.used.refcount;
+ pthread_mutex_unlock(&ch->data.used.lock);
+ }
+
pthread_mutex_unlock(&channels_mutex);
return ch;
}
+static void uw_release_channel(channel *ch) {
+ pthread_mutex_lock(&ch->data.used.lock);
+ ++ch->data.used.refcount;
+ pthread_mutex_unlock(&ch->data.used.lock);
+}
+
static void uw_subscribe(channel *ch, client *c) {
client_list *cs = malloc(sizeof(client_list));
@@ -342,7 +385,6 @@ static void uw_channel_send(channel *ch, const char *msg) {
pthread_mutex_lock(&c->data.used.lock);
if (c->data.used.sock != -1) {
- printf("Immediate send\n");
uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
uw_really_send(c->data.used.sock, pre, preLen);
uw_really_send(c->data.used.sock, msg, len);
@@ -350,7 +392,6 @@ static void uw_channel_send(channel *ch, const char *msg) {
close(c->data.used.sock);
c->data.used.sock = -1;
} else {
- printf("Delayed send\n");
buf_append(&c->data.used.msgs, pre, preLen);
buf_append(&c->data.used.msgs, msg, len);
buf_append(&c->data.used.msgs, "\n", 1);
@@ -386,6 +427,17 @@ typedef struct {
void *arg;
} cleanup;
+typedef struct {
+ usage mode;
+ channel *ch;
+ enum { OLD, NEW } newness;
+
+ size_t n_subscribed;
+ client **subscribed;
+
+ buf msgs;
+} channel_delta;
+
struct uw_context {
char *headers, *headers_end;
@@ -404,6 +456,9 @@ struct uw_context {
const char *script_header, *url_prefix;
+ size_t n_deltas;
+ channel_delta *deltas;
+
char error_message[ERROR_BUF_LEN];
};
@@ -435,6 +490,9 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si
ctx->source_count = 0;
+ ctx->n_deltas = 0;
+ ctx->deltas = malloc(0);
+
return ctx;
}
@@ -447,12 +505,20 @@ void *uw_get_db(uw_context ctx) {
}
void uw_free(uw_context ctx) {
+ size_t i;
+
buf_free(&ctx->outHeaders);
buf_free(&ctx->script);
buf_free(&ctx->page);
buf_free(&ctx->heap);
free(ctx->inputs);
free(ctx->cleanup);
+
+ for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+ free(ctx->deltas[i].subscribed);
+ buf_free(&ctx->deltas[i].msgs);
+ }
+
free(ctx);
}
@@ -465,6 +531,8 @@ void uw_reset_keep_error_message(uw_context ctx) {
ctx->regions = NULL;
ctx->cleanup_front = ctx->cleanup;
ctx->source_count = 0;
+ if (ctx->n_deltas > 0)
+ ctx->deltas[0].mode = UNUSED;
}
void uw_reset_keep_request(uw_context ctx) {
@@ -506,14 +574,7 @@ void uw_set_headers(uw_context ctx, char *headers) {
ctx->headers_end = s;
}
-failure_kind uw_begin(uw_context ctx, char *path) {
- int r = setjmp(ctx->jmp_buf);
-
- if (r == 0)
- uw_handle(ctx, path);
-
- return r;
-}
+int uw_db_begin(uw_context);
__attribute__((noreturn)) void uw_error(uw_context ctx, failure_kind fk, const char *fmt, ...) {
cleanup *cl;
@@ -548,6 +609,18 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) {
++ctx->cleanup_front;
}
+failure_kind uw_begin(uw_context ctx, char *path) {
+ int r = setjmp(ctx->jmp_buf);
+
+ if (r == 0) {
+ if (uw_db_begin(ctx))
+ uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+ uw_handle(ctx, path);
+ }
+
+ return r;
+}
+
void uw_pop_cleanup(uw_context ctx) {
if (ctx->cleanup_front == ctx->cleanup)
uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -1728,8 +1801,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
+static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
+ size_t i;
+ channel_delta *cd;
+
+ for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+
+ if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
+ return &ctx->deltas[i];
+
+ if (i < ctx->n_deltas)
+ cd = &ctx->deltas[i];
+ else {
+ ++ctx->n_deltas;
+ ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
+ cd = &ctx->deltas[ctx->n_deltas-1];
+ }
+
+ cd->mode = USED;
+ cd->newness = OLD;
+ cd->ch = ch;
+ if (cd->n_subscribed > 0)
+ cd->subscribed[0] = NULL;
+ buf_reset(&cd->msgs);
+ return cd;
+}
+
uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
- return uw_new_channel()->id;
+ size_t i;
+ channel *ch = uw_new_channel();
+ ++ch->data.used.refcount;
+ channel_delta *cd = allocate_delta(ctx, ch);
+
+ cd->newness = NEW;
+
+ return ch->id;
+}
+
+static int delta_used(channel_delta *cd) {
+ return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
}
uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
@@ -1742,13 +1852,33 @@ uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
client *c = uw_find_client(id);
- if (c == NULL)
+ if (c == NULL) {
+ uw_release_channel(ch);
uw_error(ctx, FATAL, "Unknown client ID in subscription request");
- else if (c->data.used.pass != pass)
+ } else if (c->data.used.pass != pass) {
+ uw_release_channel(ch);
+ uw_release_client(c);
uw_error(ctx, FATAL, "Wrong client password in subscription request");
- else
- uw_subscribe(ch, c);
+ } else {
+ size_t i;
+ channel_delta *cd = allocate_delta(ctx, ch);
+
+ if (delta_used(cd))
+ uw_release_channel(ch);
+
+ for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
+
+ if (i < cd->n_subscribed)
+ cd->subscribed[i] = c;
+ else {
+ ++cd->n_subscribed;
+ cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
+ cd->subscribed[cd->n_subscribed-1] = c;
+ }
+ }
}
+
+ return uw_unit_v;
}
uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
@@ -1756,6 +1886,60 @@ uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg)
if (ch == NULL)
uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
- else
- uw_channel_send(ch, msg);
+ else {
+ channel_delta *cd = allocate_delta(ctx, ch);
+ if (delta_used(cd))
+ uw_release_channel(ch);
+ buf_append(&cd->msgs, msg, strlen(msg));
+ }
+
+ return uw_unit_v;
+}
+
+int uw_db_commit(uw_context);
+int uw_db_rollback(uw_context);
+
+void uw_commit(uw_context ctx) {
+ size_t i, j;
+
+ for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+ channel *ch = ctx->deltas[i].ch;
+
+ for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
+ client *c = ctx->deltas[i].subscribed[j];
+
+ uw_subscribe(ch, c);
+ uw_release_client(c);
+ }
+
+ if (buf_used(&ctx->deltas[i].msgs) > 0) {
+ uw_channel_send(ch, ctx->deltas[i].msgs.start);
+ }
+
+ uw_release_channel(ch);
+ }
+
+ if (uw_db_commit(ctx))
+ uw_error(ctx, FATAL, "Error running SQL COMMIT");
+}
+
+int uw_rollback(uw_context ctx) {
+ size_t i, j;
+
+ for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
+ channel *ch = ctx->deltas[i].ch;
+
+ for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
+ client *c = ctx->deltas[i].subscribed[j];
+
+ uw_release_client(c);
+ }
+
+ if (ctx->deltas[i].newness == NEW)
+ uw_free_channel(ch);
+ else
+ uw_release_channel(ch);
+ }
+
+ return uw_db_rollback(ctx);
}