summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Ziv Scully <ziv@mit.edu>2015-11-13 11:03:09 -0500
committerGravatar Ziv Scully <ziv@mit.edu>2015-11-13 11:03:09 -0500
commitbad52a2868ff0551ac0199fd8124f81f9623391e (patch)
treecaea90cb436e3646b031734b0b429c0d0a28d8d9
parentd67e2a35789c5e4c7ad603c15d2acdc826fcdc76 (diff)
Finish locking, but it's not yet tested rigorously.
-rw-r--r--include/urweb/types_cpp.h3
-rw-r--r--include/urweb/urweb_cpp.h2
-rw-r--r--src/c/urweb.c143
-rw-r--r--src/lru_cache.sml20
-rw-r--r--src/sqlcache.sml51
5 files changed, 154 insertions, 65 deletions
diff --git a/include/urweb/types_cpp.h b/include/urweb/types_cpp.h
index 82f8d30a..ce0f2825 100644
--- a/include/urweb/types_cpp.h
+++ b/include/urweb/types_cpp.h
@@ -133,7 +133,8 @@ typedef struct uw_Sqlcache_Value {
typedef struct uw_Sqlcache_Entry uw_Sqlcache_Entry;
typedef struct uw_Sqlcache_Cache {
- pthread_rwlock_t lock;
+ pthread_rwlock_t lockOut;
+ pthread_rwlock_t lockIn;
uw_Sqlcache_Entry *table;
unsigned long timeInvalid;
unsigned long timeNow;
diff --git a/include/urweb/urweb_cpp.h b/include/urweb/urweb_cpp.h
index 2c032e7b..916fbbf9 100644
--- a/include/urweb/urweb_cpp.h
+++ b/include/urweb/urweb_cpp.h
@@ -406,6 +406,8 @@ void uw_Basis_writec(struct uw_context *, char);
// Sqlcache.
+void *uw_Sqlcache_rlock(struct uw_context *, uw_Sqlcache_Cache *);
+void *uw_Sqlcache_wlock(struct uw_context *, uw_Sqlcache_Cache *);
uw_Sqlcache_Value *uw_Sqlcache_check(struct uw_context *, uw_Sqlcache_Cache *, char **);
void *uw_Sqlcache_store(struct uw_context *, uw_Sqlcache_Cache *, char **, uw_Sqlcache_Value *);
void *uw_Sqlcache_flush(struct uw_context *, uw_Sqlcache_Cache *, char **);
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 778adacc..6a48e95e 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -366,6 +366,9 @@ void uw_global_init() {
uw_global_custom();
uw_init_crypto();
+
+ // Fast non-cryptographic strength randomness for Sqlcache.
+ srandom(clock());
}
void uw_app_init(uw_app *app) {
@@ -431,6 +434,11 @@ typedef struct uw_Sqlcache_Update {
struct uw_Sqlcache_Update *next;
} uw_Sqlcache_Update;
+typedef struct uw_Sqlcache_Unlock {
+ pthread_rwlock_t *lock;
+ struct uw_Sqlcache_Unlock *next;
+} uw_Sqlcache_Unlock;
+
struct uw_context {
uw_app *app;
int id;
@@ -500,6 +508,7 @@ struct uw_context {
int recordingOffset;
uw_Sqlcache_Update *cacheUpdate;
uw_Sqlcache_Update *cacheUpdateTail;
+ uw_Sqlcache_Unlock *cacheUnlock;
int remoteSock;
};
@@ -4556,7 +4565,7 @@ typedef struct uw_Sqlcache_Entry {
UT_hash_handle hh;
} uw_Sqlcache_Entry;
-void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
+static void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
if (value) {
free(value->result);
free(value->output);
@@ -4564,7 +4573,7 @@ void uw_Sqlcache_freeValue(uw_Sqlcache_Value *value) {
}
}
-void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
+static void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
if (entry) {
free(entry->key);
uw_Sqlcache_freeValue(entry->value);
@@ -4573,14 +4582,14 @@ void uw_Sqlcache_freeEntry(uw_Sqlcache_Entry* entry) {
}
// TODO: pick a number.
-unsigned int uw_Sqlcache_maxSize = 1234567890;
+static unsigned int uw_Sqlcache_maxSize = 1234567890;
-void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
+static void uw_Sqlcache_delete(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry) {
HASH_DEL(cache->table, entry);
uw_Sqlcache_freeEntry(entry);
}
-uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
+static uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t len, int bump) {
uw_Sqlcache_Entry *entry = NULL;
HASH_FIND(hh, cache->table, key, len, entry);
if (entry && bump) {
@@ -4592,7 +4601,7 @@ uw_Sqlcache_Entry *uw_Sqlcache_find(uw_Sqlcache_Cache *cache, char *key, size_t
return entry;
}
-void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
+static void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t len) {
HASH_ADD_KEYPTR(hh, cache->table, entry->key, len, entry);
if (HASH_COUNT(cache->table) > uw_Sqlcache_maxSize) {
// Deletes the first element of the cache.
@@ -4600,17 +4609,17 @@ void uw_Sqlcache_add(uw_Sqlcache_Cache *cache, uw_Sqlcache_Entry *entry, size_t
}
}
-unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
+static unsigned long uw_Sqlcache_getTimeNow(uw_Sqlcache_Cache *cache) {
return ++cache->timeNow;
}
-unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
+static unsigned long uw_Sqlcache_timeMax(unsigned long x, unsigned long y) {
return x > y ? x : y;
}
-char uw_Sqlcache_keySep = '_';
+static char uw_Sqlcache_keySep = '_';
-char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
+static char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
size_t len = 0;
while (numKeys-- > 0) {
char* k = keys[numKeys];
@@ -4627,7 +4636,7 @@ char *uw_Sqlcache_allocKeyBuffer(char **keys, size_t numKeys) {
return buf;
}
-char *uw_Sqlcache_keyCopy(char *buf, char *key) {
+static char *uw_Sqlcache_keyCopy(char *buf, char *key) {
*buf++ = uw_Sqlcache_keySep;
return stpcpy(buf, key);
}
@@ -4635,7 +4644,12 @@ char *uw_Sqlcache_keyCopy(char *buf, char *key) {
// The NUL-terminated prefix of [key] below always looks something like "_k1_k2_k3..._kn".
uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys) {
- pthread_rwlock_rdlock(&cache->lock);
+ int doBump = random() % 1024 == 0;
+ if (doBump) {
+ pthread_rwlock_wrlock(&cache->lockIn);
+ } else {
+ pthread_rwlock_rdlock(&cache->lockIn);
+ }
size_t numKeys = cache->numKeys;
char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
char *buf = key;
@@ -4645,46 +4659,49 @@ uw_Sqlcache_Value *uw_Sqlcache_check(uw_context ctx, uw_Sqlcache_Cache *cache, c
entry = cache->table;
if (!entry) {
free(key);
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
return NULL;
}
} else {
while (numKeys-- > 0) {
buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
size_t len = buf - key;
- entry = uw_Sqlcache_find(cache, key, len, 1);
+ entry = uw_Sqlcache_find(cache, key, len, doBump);
if (!entry) {
free(key);
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
return NULL;
}
timeInvalid = uw_Sqlcache_timeMax(timeInvalid, entry->timeInvalid);
}
free(key);
}
- // TODO: pass back copy of value and free it in the generated code... or use uw_malloc?
uw_Sqlcache_Value *value = entry->value;
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
+ // ASK: though the argument isn't trivial, this is safe, right?
+ // Returning outside the lock is safe because updates happen at commit time.
+ // Those are the only times the returned value or its strings can get freed.
+ // Handler output is a new string, so it's safe to free this at commit time.
return value && value->timeValid > timeInvalid ? value : NULL;
}
-void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
- pthread_rwlock_wrlock(&cache->lock);
+static void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcache_Value *value) {
+ pthread_rwlock_wrlock(&cache->lockIn);
size_t numKeys = cache->numKeys;
- char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
- char *buf = key;
time_t timeNow = uw_Sqlcache_getTimeNow(cache);
uw_Sqlcache_Entry *entry;
if (numKeys == 0) {
entry = cache->table;
if (!entry) {
entry = malloc(sizeof(uw_Sqlcache_Entry));
- entry->key = strdup(key);
+ entry->key = NULL;
entry->value = NULL;
entry->timeInvalid = 0;
cache->table = entry;
}
} else {
+ char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+ char *buf = key;
while (numKeys-- > 0) {
buf = uw_Sqlcache_keyCopy(buf, keys[numKeys]);
size_t len = buf - key;
@@ -4702,23 +4719,23 @@ void uw_Sqlcache_storeCommitOne(uw_Sqlcache_Cache *cache, char **keys, uw_Sqlcac
uw_Sqlcache_freeValue(entry->value);
entry->value = value;
entry->value->timeValid = timeNow;
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
}
-void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
- pthread_rwlock_wrlock(&cache->lock);
+static void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
+ pthread_rwlock_wrlock(&cache->lockIn);
size_t numKeys = cache->numKeys;
- char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
- char *buf = key;
- time_t timeNow = uw_Sqlcache_getTimeNow(cache);
- uw_Sqlcache_Entry *entry;
if (numKeys == 0) {
- entry = cache->table;
+ uw_Sqlcache_Entry *entry = cache->table;
if (entry) {
uw_Sqlcache_freeValue(entry->value);
entry->value = NULL;
}
} else {
+ char *key = uw_Sqlcache_allocKeyBuffer(keys, numKeys);
+ char *buf = key;
+ time_t timeNow = uw_Sqlcache_getTimeNow(cache);
+ uw_Sqlcache_Entry *entry = NULL;
while (numKeys-- > 0) {
char *k = keys[numKeys];
if (!k) {
@@ -4729,15 +4746,16 @@ void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
cache->timeInvalid = timeNow;
}
free(key);
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
return;
}
buf = uw_Sqlcache_keyCopy(buf, k);
size_t len = buf - key;
entry = uw_Sqlcache_find(cache, key, len, 0);
if (!entry) {
+ // Nothing in the cache to flush.
free(key);
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
return;
}
}
@@ -4745,10 +4763,25 @@ void uw_Sqlcache_flushCommitOne(uw_Sqlcache_Cache *cache, char **keys) {
// All the keys were non-null and the relevant entry is present, so we delete it.
uw_Sqlcache_delete(cache, entry);
}
- pthread_rwlock_unlock(&cache->lock);
+ pthread_rwlock_unlock(&cache->lockIn);
+}
+
+static void uw_Sqlcache_commit(void *data) {
+ uw_context ctx = (uw_context)data;
+ uw_Sqlcache_Update *update = ctx->cacheUpdate;
+ while (update) {
+ uw_Sqlcache_Cache *cache = update->cache;
+ char **keys = update->keys;
+ if (update->value) {
+ uw_Sqlcache_storeCommitOne(cache, keys, update->value);
+ } else {
+ uw_Sqlcache_flushCommitOne(cache, keys);
+ }
+ update = update->next;
+ }
}
-void uw_Sqlcache_freeUpdate(void *data, int dontCare) {
+static void uw_Sqlcache_free(void *data, int dontCare) {
uw_context ctx = (uw_context)data;
uw_Sqlcache_Update *update = ctx->cacheUpdate;
while (update) {
@@ -4765,24 +4798,38 @@ void uw_Sqlcache_freeUpdate(void *data, int dontCare) {
}
ctx->cacheUpdate = NULL;
ctx->cacheUpdateTail = NULL;
+ uw_Sqlcache_Unlock *unlock = ctx->cacheUnlock;
+ while (unlock) {
+ pthread_rwlock_unlock(unlock->lock);
+ uw_Sqlcache_Unlock *nextUnlock = unlock->next;
+ free(unlock);
+ unlock = nextUnlock;
+ }
+ ctx->cacheUnlock = NULL;
}
-void uw_Sqlcache_commitUpdate(void *data) {
- uw_context ctx = (uw_context)data;
- uw_Sqlcache_Update *update = ctx->cacheUpdate;
- while (update) {
- uw_Sqlcache_Cache *cache = update->cache;
- char **keys = update->keys;
- if (update->value) {
- uw_Sqlcache_storeCommitOne(cache, keys, update->value);
- } else {
- uw_Sqlcache_flushCommitOne(cache, keys);
- }
- update = update->next;
+static void uw_Sqlcache_pushUnlock(uw_context ctx, pthread_rwlock_t *lock) {
+ if (!ctx->cacheUnlock) {
+ // Just need one registered commit for both updating and unlocking.
+ uw_register_transactional(ctx, ctx, uw_Sqlcache_commit, NULL, uw_Sqlcache_free);
}
+ uw_Sqlcache_Unlock *unlock = malloc(sizeof(uw_Sqlcache_Unlock));
+ unlock->lock = lock;
+ unlock->next = ctx->cacheUnlock;
+ ctx->cacheUnlock = unlock;
+}
+
+void uw_Sqlcache_rlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+ pthread_rwlock_rdlock(&cache->lockOut);
+ uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
+}
+
+void uw_Sqlcache_wlock(uw_context ctx, uw_Sqlcache_Cache *cache) {
+ pthread_rwlock_wrlock(&cache->lockOut);
+ uw_Sqlcache_pushUnlock(ctx, &cache->lockOut);
}
-char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
+static char **uw_Sqlcache_copyKeys(char **keys, size_t numKeys) {
char **copy = malloc(sizeof(char *) * numKeys);
while (numKeys-- > 0) {
char *k = keys[numKeys];
@@ -4798,11 +4845,9 @@ void uw_Sqlcache_store(uw_context ctx, uw_Sqlcache_Cache *cache, char **keys, uw
update->value = value;
update->next = NULL;
if (ctx->cacheUpdateTail) {
- // An update is already registered, so just extend it.
ctx->cacheUpdateTail->next = update;
} else {
ctx->cacheUpdate = update;
- uw_register_transactional(ctx, ctx, uw_Sqlcache_commitUpdate, NULL, uw_Sqlcache_freeUpdate);
}
ctx->cacheUpdateTail = update;
}
diff --git a/src/lru_cache.sml b/src/lru_cache.sml
index b66becb7..0276de91 100644
--- a/src/lru_cache.sml
+++ b/src/lru_cache.sml
@@ -69,7 +69,9 @@ fun setupQuery {index, params} =
Print.box
[string ("static uw_Sqlcache_Cache cacheStruct" ^ i ^ " = {"),
newline,
- string " .lock = PTHREAD_RWLOCK_INITIALIZER,",
+ string " .lockIn = PTHREAD_RWLOCK_INITIALIZER,",
+ newline,
+ string " .lockOut = PTHREAD_RWLOCK_INITIALIZER,",
newline,
string " .table = NULL,",
newline,
@@ -83,6 +85,22 @@ fun setupQuery {index, params} =
newline,
newline,
+ string ("static void uw_Sqlcache_rlock" ^ i ^ "(uw_context ctx) {"),
+ newline,
+ string (" uw_Sqlcache_rlock(ctx, cache" ^ i ^ ");"),
+ newline,
+ string "}",
+ newline,
+ newline,
+
+ string ("static void uw_Sqlcache_wlock" ^ i ^ "(uw_context ctx) {"),
+ newline,
+ string (" uw_Sqlcache_wlock(ctx, cache" ^ i ^ ");"),
+ newline,
+ string "}",
+ newline,
+ newline,
+
string ("static uw_Basis_string uw_Sqlcache_check" ^ i),
string ("(uw_context ctx" ^ typedArgs ^ ") {"),
newline,
diff --git a/src/sqlcache.sml b/src/sqlcache.sml
index 2b3b80ae..6583dc91 100644
--- a/src/sqlcache.sml
+++ b/src/sqlcache.sml
@@ -913,7 +913,7 @@ val conflictMaps = ConflictMaps.conflictMaps
(* Program Instrumentation Utilities *)
(*************************************)
-val {check, store, flush, ...} = getCache ()
+val {check, store, flush, lock, ...} = getCache ()
val dummyTyp = (TRecord [], dummyLoc)
@@ -1431,7 +1431,7 @@ fun addFlushing ((file, {tableToIndices, indexToInvalInfo, ffiInfo, ...} : state
(* Locking *)
(***********)
-(* TODO: do this less evil-ly by not relying on specific FFI names, please? *)
+(* TODO: do this less evilly by not relying on specific FFI names, please? *)
fun locksNeeded file =
transitiveAnalysis
(fn ((_, name, _, e, _), state) =>
@@ -1439,14 +1439,14 @@ fun locksNeeded file =
{typ = #2,
exp = fn (EFfiApp ("Sqlcache", x, _), state as {store, flush}) =>
(case Int.fromString (String.extract (x, 5, NONE)) of
- NONE => raise Match
+ NONE => state
| SOME index =>
- if String.isPrefix "store" x
- then {store = IIMM.insert (store, name, index), flush = flush}
- else if String.isPrefix "flush" x
+ if String.isPrefix "flush" x
then {store = store, flush = IIMM.insert (flush, name, index)}
+ else if String.isPrefix "store" x
+ then {store = IIMM.insert (store, name, index), flush = flush}
else state)
- | _ => state}
+ | (_, state) => state}
state
e)
{store = IIMM.empty, flush = IIMM.empty}
@@ -1459,13 +1459,36 @@ fun exports (decls, _) =
IS.empty
decls
-(* fun addLocking file = *)
-(* let *)
-(* val whichLocks = locksNeeded file *)
-(* val needsLocks = exports file *)
-(* in *)
+fun wrapLocks (locks, (exp', loc)) =
+ case exp' of
+ EAbs (s, t1, t2, exp) => (EAbs (s, t1, t2, wrapLocks (locks, exp)), loc)
+ | _ => (List.foldr (fn (l, e') => sequence [lock l, e']) exp' locks, loc)
+
+fun addLocking file =
+ let
+ val {store, flush} = locksNeeded file
+ fun locks n =
+ let
+ val wlocks = IIMM.findSet (flush, n)
+ val rlocks = IIMM.findSet (store, n)
+ val ls = map (fn i => (i, true)) (IS.listItems wlocks)
+ @ map (fn i => (i, false)) (IS.listItems (IS.difference (rlocks, wlocks)))
+ in
+ ListMergeSort.sort (fn ((i, _), (j, _)) => i > j) ls
+ end
+ val expts = exports file
+ fun doVal (v as (x, n, t, exp, s)) =
+ if IS.member (expts, n)
+ then (x, n, t, wrapLocks ((locks n), exp), s)
+ else v
+ val doDecl =
+ fn (DVal v, loc) => (DVal (doVal v), loc)
+ | (DValRec vs, loc) => (DValRec (map doVal vs), loc)
+ | decl => decl
+ in
+ mapFst (map doDecl) file
+ end
-(* end *)
(************************)
(* Compiler Entry Point *)
@@ -1494,7 +1517,7 @@ fun insertAfterDatatypes ((decls, sideInfo), newDecls) =
(datatypes @ newDecls @ others, sideInfo)
end
-val go' = addFlushing o addCaching o simplifySql o inlineSql
+val go' = addLocking o addFlushing o addCaching o simplifySql o inlineSql
fun go file =
let