diff options
author | Adam Chlipala <adam@chlipala.net> | 2014-06-27 14:39:31 -0400 |
---|---|---|
committer | Adam Chlipala <adam@chlipala.net> | 2014-06-27 14:39:31 -0400 |
commit | 0a66a4a39c9725434e5c9cdf38902092c2da7849 (patch) | |
tree | 949c16b141cc38311c9696861578c296f17c02c7 /src | |
parent | 9036d37ed921adc09f47896d404be8cfcaf6187f (diff) |
Add locking to enforce atomicity of message sends from one transaction
Diffstat (limited to 'src')
-rw-r--r-- | src/c/urweb.c | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/src/c/urweb.c b/src/c/urweb.c index 7b95d6a9..86cd2b8b 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -3304,6 +3304,8 @@ static char *find_sig(char *haystack) { return s; } +static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER; + int uw_commit(uw_context ctx) { int i; char *sig; @@ -3323,10 +3325,17 @@ int uw_commit(uw_context ctx) { } } + // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit. + if (ctx->used_deltas > 0) + pthread_mutex_lock(&message_send_mutex); + if (ctx->transaction_started) { int code = ctx->app->db_commit(ctx); if (code) { + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); + if (ctx->client) release_client(ctx->client); @@ -3360,16 +3369,19 @@ int uw_commit(uw_context ctx) { if (ctx->transactionals[i].commit) { ctx->transactionals[i].commit(ctx->transactionals[i].data); if (uw_has_error(ctx)) { - if (ctx->client) - release_client(ctx->client); + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); - for (i = ctx->used_transactionals-1; i >= 0; --i) - if (ctx->transactionals[i].rollback != NULL) - ctx->transactionals[i].rollback(ctx->transactionals[i].data); + if (ctx->client) + release_client(ctx->client); - for (i = ctx->used_transactionals-1; i >= 0; --i) - if (ctx->transactionals[i].free) - ctx->transactionals[i].free(ctx->transactionals[i].data, 0); + for (i = ctx->used_transactionals-1; i >= 0; --i) + if (ctx->transactionals[i].rollback != NULL) + ctx->transactionals[i].rollback(ctx->transactionals[i].data); + + for (i = ctx->used_transactionals-1; i >= 0; --i) + if (ctx->transactionals[i].free) + ctx->transactionals[i].free(ctx->transactionals[i].data, 0); return 0; } @@ -3385,6 +3397,9 @@ int uw_commit(uw_context ctx) { client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script)); } + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); + if (ctx->client) release_client(ctx->client); |