summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adam@chlipala.net>2014-06-27 14:39:31 -0400
committerGravatar Adam Chlipala <adam@chlipala.net>2014-06-27 14:39:31 -0400
commit0a66a4a39c9725434e5c9cdf38902092c2da7849 (patch)
tree949c16b141cc38311c9696861578c296f17c02c7 /src
parent9036d37ed921adc09f47896d404be8cfcaf6187f (diff)
Add locking to enforce atomicity of message sends from one transaction
Diffstat (limited to 'src')
-rw-r--r--src/c/urweb.c31
1 files changed, 23 insertions, 8 deletions
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 7b95d6a9..86cd2b8b 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -3304,6 +3304,8 @@ static char *find_sig(char *haystack) {
return s;
}
+static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER;
+
int uw_commit(uw_context ctx) {
int i;
char *sig;
@@ -3323,10 +3325,17 @@ int uw_commit(uw_context ctx) {
}
}
+ // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit.
+ if (ctx->used_deltas > 0)
+ pthread_mutex_lock(&message_send_mutex);
+
if (ctx->transaction_started) {
int code = ctx->app->db_commit(ctx);
if (code) {
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
+
if (ctx->client)
release_client(ctx->client);
@@ -3360,16 +3369,19 @@ int uw_commit(uw_context ctx) {
if (ctx->transactionals[i].commit) {
ctx->transactionals[i].commit(ctx->transactionals[i].data);
if (uw_has_error(ctx)) {
- if (ctx->client)
- release_client(ctx->client);
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
- for (i = ctx->used_transactionals-1; i >= 0; --i)
- if (ctx->transactionals[i].rollback != NULL)
- ctx->transactionals[i].rollback(ctx->transactionals[i].data);
+ if (ctx->client)
+ release_client(ctx->client);
- for (i = ctx->used_transactionals-1; i >= 0; --i)
- if (ctx->transactionals[i].free)
- ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
+ for (i = ctx->used_transactionals-1; i >= 0; --i)
+ if (ctx->transactionals[i].rollback != NULL)
+ ctx->transactionals[i].rollback(ctx->transactionals[i].data);
+
+ for (i = ctx->used_transactionals-1; i >= 0; --i)
+ if (ctx->transactionals[i].free)
+ ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
return 0;
}
@@ -3385,6 +3397,9 @@ int uw_commit(uw_context ctx) {
client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script));
}
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
+
if (ctx->client)
release_client(ctx->client);