summaryrefslogtreecommitdiff
path: root/src/c/urweb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/c/urweb.c')
-rw-r--r--src/c/urweb.c789
1 files changed, 301 insertions, 488 deletions
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 7c0cdb25..0194c226 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -7,6 +7,7 @@
#include <ctype.h>
#include <setjmp.h>
#include <stdarg.h>
+#include <assert.h>
#include <pthread.h>
@@ -88,79 +89,56 @@ static void buf_append(buf *b, const char *s, size_t len) {
typedef enum { UNUSED, USED } usage;
-typedef struct channel_list {
- struct channel *data;
- struct channel_list *next;
-} channel_list;
-
typedef struct client {
- size_t id;
+ unsigned id;
usage mode;
- union {
- struct client *next;
- struct {
- pthread_mutex_t lock;
- int pass;
- buf msgs;
- int sock;
- time_t last_contact;
- unsigned refcount;
- channel_list *channels;
- } used;
- } data;
+ int pass;
+ struct client *next;
+ pthread_mutex_t lock;
+ buf msgs;
+ int sock;
+ time_t last_contact;
+ unsigned n_channels;
} client;
-typedef struct client_list {
- client *data;
- struct client_list *next;
-} client_list;
-
-typedef struct channel {
- size_t id;
- usage mode;
- union {
- struct channel *next;
- struct {
- pthread_mutex_t lock;
- client_list *clients;
- unsigned refcount;
- } used;
- } data;
-} channel;
-
// Persistent client state
-static client **clients, *clients_free;
-static size_t n_clients;
+static client **clients, *clients_free, *clients_used;
+static unsigned n_clients;
static pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
-static client *uw_new_client() {
+static client *new_client() {
client *c;
pthread_mutex_lock(&clients_mutex);
if (clients_free) {
c = clients_free;
- clients_free = clients_free->data.next;
+ clients_free = clients_free->next;
}
else {
++n_clients;
clients = realloc(clients, sizeof(client) * n_clients);
c = malloc(sizeof(client));
c->id = n_clients-1;
+ pthread_mutex_init(&c->lock, NULL);
+ buf_init(&c->msgs, 0);
clients[n_clients-1] = c;
}
+ pthread_mutex_lock(&c->lock);
c->mode = USED;
- pthread_mutex_init(&c->data.used.lock, NULL);
- c->data.used.pass = rand();
- c->data.used.sock = -1;
- c->data.used.last_contact = time(NULL);
- buf_init(&c->data.used.msgs, 0);
- c->data.used.refcount = 0;
- c->data.used.channels = NULL;
+ c->pass = rand();
+ c->sock = -1;
+ c->last_contact = time(NULL);
+ buf_reset(&c->msgs);
+ c->n_channels = 0;
+ pthread_mutex_unlock(&c->lock);
+
+ c->next = clients_used;
+ clients_used = c;
pthread_mutex_unlock(&clients_mutex);
@@ -169,7 +147,7 @@ static client *uw_new_client() {
static const char begin_msgs[] = "HTTP/1.1 200 OK\r\nContent-type: text/plain\r\n\r\n";
-static client *uw_find_client(size_t id) {
+static client *find_client(unsigned id) {
client *c;
pthread_mutex_lock(&clients_mutex);
@@ -181,282 +159,111 @@ static client *uw_find_client(size_t id) {
c = clients[id];
- if (c->mode != USED) {
- pthread_mutex_unlock(&clients_mutex);
- return NULL;
- }
-
- pthread_mutex_lock(&c->data.used.lock);
- ++c->data.used.refcount;
- pthread_mutex_unlock(&c->data.used.lock);
pthread_mutex_unlock(&clients_mutex);
return c;
}
-static void uw_release_client(client *c) {
- pthread_mutex_lock(&c->data.used.lock);
- --c->data.used.refcount;
- pthread_mutex_unlock(&c->data.used.lock);
-}
-
-void uw_client_connect(size_t id, int pass, int sock) {
- client *c = uw_find_client(id);
+void uw_client_connect(unsigned id, int pass, int sock) {
+ client *c = find_client(id);
if (c == NULL) {
close(sock);
- fprintf(stderr, "Out-of-bounds client request (%d)\n", (int)id);
+ fprintf(stderr, "Out-of-bounds client request (%u)\n", id);
return;
}
- uw_release_client(c);
+ pthread_mutex_lock(&c->lock);
- pthread_mutex_lock(&c->data.used.lock);
+ if (c->mode != USED) {
+ pthread_mutex_unlock(&c->lock);
+ close(sock);
+ fprintf(stderr, "Client request for unused slot (%u)\n", id);
+ return;
+ }
- if (pass != c->data.used.pass) {
- pthread_mutex_unlock(&c->data.used.lock);
+ if (pass != c->pass) {
+ pthread_mutex_unlock(&c->lock);
close(sock);
- fprintf(stderr, "Wrong client password (%d)\n", (int)id);
+ fprintf(stderr, "Wrong client password (%u, %d)\n", id, pass);
return;
}
- if (c->data.used.sock != -1) {
- close(c->data.used.sock);
- c->data.used.sock = -1;
+ if (c->sock != -1) {
+ close(c->sock);
+ c->sock = -1;
}
- c->data.used.last_contact = time(NULL);
+ c->last_contact = time(NULL);
- if (buf_used(&c->data.used.msgs) > 0) {
+ if (buf_used(&c->msgs) > 0) {
uw_really_send(sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(sock, c->data.used.msgs.start, buf_used(&c->data.used.msgs));
- buf_reset(&c->data.used.msgs);
+ uw_really_send(sock, c->msgs.start, buf_used(&c->msgs));
+ buf_reset(&c->msgs);
close(sock);
}
else
- c->data.used.sock = sock;
+ c->sock = sock;
- pthread_mutex_unlock(&c->data.used.lock);
+ pthread_mutex_unlock(&c->lock);
}
-
-static void uw_free_client(client *c) {
- channel_list *chs;
-
+static void free_client(client *c) {
printf("Freeing client %d\n", c->id);
- if (c->mode == USED) {
- pthread_mutex_lock(&c->data.used.lock);
-
- for (chs = c->data.used.channels; chs; ) {
- client_list *prev, *cs;
-
- channel *ch = chs->data;
- channel_list *tmp = chs->next;
- free(chs);
- chs = tmp;
-
- pthread_mutex_lock(&ch->data.used.lock);
- for (prev = NULL, cs = ch->data.used.clients; cs; ) {
- if (cs->data == c) {
- client_list *tmp = cs->next;
- free(cs);
- cs = tmp;
- if (prev)
- prev->next = cs;
- else
- ch->data.used.clients = cs;
- }
- else {
- prev = cs;
- cs = cs->next;
- }
- }
- pthread_mutex_unlock(&ch->data.used.lock);
- }
+ c->mode = UNUSED;
+ c->pass = -1;
- if (c->data.used.sock != -1)
- close(c->data.used.sock);
-
- pthread_mutex_unlock(&c->data.used.lock);
- pthread_mutex_destroy(&c->data.used.lock);
- buf_free(&c->data.used.msgs);
- c->mode = UNUSED;
-
- c->data.next = clients_free;
- clients_free = c;
- }
+ c->next = clients_free;
+ clients_free = c;
}
extern int uw_timeout;
void uw_prune_clients() {
- size_t i;
+ client *c, *next, *prev = NULL;
time_t cutoff;
cutoff = time(NULL) - uw_timeout;
pthread_mutex_lock(&clients_mutex);
- for (i = 0; i < n_clients; ++i) {
- if (clients[i]->mode == USED && clients[i]->data.used.last_contact < cutoff
- && clients[i]->data.used.refcount == 0)
- uw_free_client(clients[i]);
- }
-
- pthread_mutex_unlock(&clients_mutex);
-}
-
-
-// Persistent channel state
-
-
-static channel **channels, *channels_free;
-static size_t n_channels;
-
-static pthread_mutex_t channels_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-static channel *uw_new_channel() {
- channel *ch;
-
- pthread_mutex_lock(&channels_mutex);
-
- if (channels_free) {
- ch = channels_free;
- channels_free = channels_free->data.next;
- }
- else {
- ++n_channels;
- channels = realloc(channels, sizeof(channels) * n_channels);
- ch = malloc(sizeof(channel));
- ch->id = n_channels-1;
- channels[n_channels-1] = ch;
- }
-
- ch->mode = USED;
- pthread_mutex_init(&ch->data.used.lock, NULL);
- ch->data.used.clients = NULL;
- ch->data.used.refcount = 0;
-
- pthread_mutex_unlock(&channels_mutex);
-
- return ch;
-}
-
-static void uw_free_channel(channel *ch) {
- if (ch->mode == USED) {
- client_list *cs;
-
- for (cs = ch->data.used.clients; cs; ) {
- client_list *tmp = cs->next;
- free(cs);
- cs = tmp;
- }
- pthread_mutex_destroy(&ch->data.used.lock);
- ch->mode = UNUSED;
- ch->data.next = channels_free;
- channels_free = ch;
- }
-}
-
-static channel *uw_find_channel(size_t id) {
- channel *ch = NULL;
-
- pthread_mutex_lock(&channels_mutex);
-
- if (id < n_channels && channels[id]->mode == USED) {
- ch = channels[id];
-
- pthread_mutex_lock(&ch->data.used.lock);
- ++ch->data.used.refcount;
- pthread_mutex_unlock(&ch->data.used.lock);
- }
-
- pthread_mutex_unlock(&channels_mutex);
-
- return ch;
-}
-
-static void uw_release_channel(channel *ch) {
- pthread_mutex_lock(&ch->data.used.lock);
- ++ch->data.used.refcount;
- pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_subscribe(channel *ch, client *c) {
- client_list *cs;
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (cs = ch->data.used.clients; cs; cs = cs->next)
- if (cs->data == c) {
- pthread_mutex_unlock(&ch->data.used.lock);
- return;
- }
-
- cs = malloc(sizeof(client_list));
- cs->data = c;
- cs->next = ch->data.used.clients;
- ch->data.used.clients = cs;
-
- pthread_mutex_unlock(&ch->data.used.lock);
-}
-
-static void uw_unsubscribe(channel *ch, client *c) {
- client_list *prev, *cur, *tmp;
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (prev = NULL, cur = ch->data.used.clients; cur; ) {
- if (cur->data == c) {
+ for (c = clients_used; c; c = next) {
+ next = c->next;
+ pthread_mutex_lock(&c->lock);
+ if (c->last_contact < cutoff) {
if (prev)
- prev->next = cur->next;
+ prev->next = next;
else
- ch->data.used.clients = cur->next;
- tmp = cur;
- cur = cur->next;
- free(tmp);
- }
- else {
- prev = cur;
- cur = cur->next;
+ clients_used = next;
+ free_client(c);
}
+ else
+ prev = c;
+ pthread_mutex_unlock(&c->lock);
}
- pthread_mutex_unlock(&ch->data.used.lock);
+ pthread_mutex_unlock(&clients_mutex);
}
-static void uw_channel_send(channel *ch, const char *msg) {
- size_t len = strlen(msg), preLen;
- char pre[INTS_MAX + 2];
- client_list *cs;
-
- sprintf(pre, "%d\n", (int)ch->id);
- preLen = strlen(pre);
-
- pthread_mutex_lock(&ch->data.used.lock);
-
- for (cs = ch->data.used.clients; cs; cs = cs->next) {
- client *c = cs->data;
-
- pthread_mutex_lock(&c->data.used.lock);
+static uw_Basis_channel new_channel(client *c) {
+ uw_Basis_channel ch = {c->id, c->n_channels++};
+ return ch;
+}
- if (c->data.used.sock != -1) {
- uw_really_send(c->data.used.sock, begin_msgs, sizeof(begin_msgs) - 1);
- uw_really_send(c->data.used.sock, pre, preLen);
- uw_really_send(c->data.used.sock, msg, len);
- uw_really_send(c->data.used.sock, "\n", 1);
- close(c->data.used.sock);
- c->data.used.sock = -1;
- } else {
- buf_append(&c->data.used.msgs, pre, preLen);
- buf_append(&c->data.used.msgs, msg, len);
- buf_append(&c->data.used.msgs, "\n", 1);
- }
+static void client_send(int already_locked, client *c, buf *msg) {
+ if (!already_locked)
+ pthread_mutex_lock(&c->lock);
- pthread_mutex_unlock(&c->data.used.lock);
- }
+ if (c->sock != -1) {
+ uw_really_send(c->sock, begin_msgs, sizeof(begin_msgs) - 1);
+ uw_really_send(c->sock, msg->start, buf_used(msg));
+ close(c->sock);
+ c->sock = -1;
+ } else
+ buf_append(&c->msgs, msg->start, buf_used(msg));
- pthread_mutex_unlock(&ch->data.used.lock);
+ if (!already_locked)
+ pthread_mutex_unlock(&c->lock);
}
@@ -466,7 +273,6 @@ void uw_global_init() {
srand(time(NULL) ^ getpid());
clients = malloc(0);
- channels = malloc(0);
}
@@ -484,15 +290,9 @@ typedef struct {
} cleanup;
typedef struct {
- usage mode;
- channel *ch;
- enum { OLD, NEW } newness;
-
- size_t n_subscribed;
- client **subscribed;
-
+ unsigned client;
buf msgs;
-} channel_delta;
+} delta;
struct uw_context {
char *headers, *headers_end;
@@ -512,11 +312,13 @@ struct uw_context {
const char *script_header, *url_prefix;
- size_t n_deltas;
- channel_delta *deltas;
+ size_t n_deltas, used_deltas;
+ delta *deltas;
int timeout;
+ client *client;
+
char error_message[ERROR_BUF_LEN];
};
@@ -548,7 +350,7 @@ uw_context uw_init(size_t outHeaders_len, size_t script_len, size_t page_len, si
ctx->source_count = 0;
- ctx->n_deltas = 0;
+ ctx->n_deltas = ctx->used_deltas = 0;
ctx->deltas = malloc(0);
ctx->timeout = uw_timeout;
@@ -574,10 +376,8 @@ void uw_free(uw_context ctx) {
free(ctx->inputs);
free(ctx->cleanup);
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- free(ctx->deltas[i].subscribed);
+ for (i = 0; i < ctx->n_deltas; ++i)
buf_free(&ctx->deltas[i].msgs);
- }
free(ctx);
}
@@ -591,8 +391,8 @@ void uw_reset_keep_error_message(uw_context ctx) {
ctx->regions = NULL;
ctx->cleanup_front = ctx->cleanup;
ctx->source_count = 0;
- if (ctx->n_deltas > 0)
- ctx->deltas[0].mode = UNUSED;
+ ctx->used_deltas = 0;
+ ctx->client = NULL;
}
void uw_reset_keep_request(uw_context ctx) {
@@ -669,18 +469,73 @@ void uw_push_cleanup(uw_context ctx, void (*func)(void *), void *arg) {
++ctx->cleanup_front;
}
+uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
+ int len = strlen(h);
+ char *s = ctx->headers, *p;
+
+ while (p = strchr(s, ':')) {
+ if (p - s == len && !strncasecmp(s, h, len)) {
+ return p + 2;
+ } else {
+ if ((s = strchr(p, 0)) && s < ctx->headers_end)
+ s += 2;
+ else
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
+
+void uw_login(uw_context ctx) {
+ if (ctx->script_header[0]) {
+ char *id_s, *pass_s;
+
+ if ((id_s = uw_Basis_requestHeader(ctx, "UrWeb-Client"))
+ && (pass_s = uw_Basis_requestHeader(ctx, "UrWeb-Pass"))) {
+ unsigned id = atoi(id_s);
+ int pass = atoi(pass_s);
+ client *c = find_client(id);
+
+ if (c == NULL)
+ uw_error(ctx, FATAL, "Unknown client ID in HTTP headers (%s, %s)", id_s, pass_s);
+ else {
+ pthread_mutex_lock(&c->lock);
+ ctx->client = c;
+
+ if (c->mode != USED)
+ uw_error(ctx, FATAL, "Stale client ID (%u) in subscription request", id);
+ if (c->pass != pass)
+ uw_error(ctx, FATAL, "Wrong client password (%u, %d) in subscription request", id, pass);
+ }
+ } else {
+ client *c = new_client();
+ pthread_mutex_lock(&c->lock);
+ ctx->client = c;
+ }
+ }
+}
+
failure_kind uw_begin(uw_context ctx, char *path) {
int r = setjmp(ctx->jmp_buf);
if (r == 0) {
if (uw_db_begin(ctx))
uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
+
uw_handle(ctx, path);
}
return r;
}
+uw_Basis_client uw_Basis_self(uw_context ctx, uw_unit u) {
+ if (ctx->client == NULL)
+ uw_error(ctx, FATAL, "Call to Basis.self() from page that has only server-side code");
+
+ return ctx->client->id;
+}
+
void uw_pop_cleanup(uw_context ctx) {
if (ctx->cleanup_front == ctx->cleanup)
uw_error(ctx, FATAL, "Attempt to pop from empty cleanup action stack");
@@ -843,9 +698,6 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
if (ctx->script_header[0] == 0)
return "";
else {
- int pass;
- client *c = uw_new_client(&pass);
-
char *r = uw_malloc(ctx, strlen(ctx->script_header) + 18 + buf_used(&ctx->script));
sprintf(r, "%s<script>%s</script>",
ctx->script_header,
@@ -855,16 +707,13 @@ const char *uw_Basis_get_script(uw_context ctx, uw_unit u) {
}
const char *uw_Basis_get_settings(uw_context ctx, uw_Basis_string onload) {
- if (ctx->script_header[0] == 0)
+ if (ctx->client == NULL)
return "";
else {
- int pass;
- client *c = uw_new_client(&pass);
-
char *r = uw_malloc(ctx, 52 + 3 * INTS_MAX + strlen(ctx->url_prefix) + strlen(onload));
- sprintf(r, " onload='client_id=%d;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
- (int)c->id,
- c->data.used.pass,
+ sprintf(r, " onload='client_id=%u;client_pass=%d;url_prefix=\"%s\";timeout=%d;listener();%s'",
+ ctx->client->id,
+ ctx->client->pass,
ctx->url_prefix,
ctx->timeout,
onload);
@@ -942,6 +791,21 @@ uw_Basis_string uw_Basis_jsifyString_ws(uw_context ctx, uw_Basis_string s) {
return r;
}
+char *uw_Basis_jsifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client == NULL || chn.cli != ctx->client->id)
+ return "null";
+ else {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", chn.chn, &len);
+ ctx->heap.front += len+1;
+ return r;
+ }
+}
+
uw_Basis_int uw_Basis_new_client_source(uw_context ctx, uw_Basis_string s) {
int len;
size_t s_len = strlen(s);
@@ -1007,16 +871,6 @@ char *uw_Basis_attrifyInt(uw_context ctx, uw_Basis_int n) {
return result;
}
-char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel n) {
- char *result;
- int len;
- uw_check_heap(ctx, INTS_MAX);
- result = ctx->heap.front;
- sprintf(result, "%lld%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return result;
-}
-
char *uw_Basis_attrifyFloat(uw_context ctx, uw_Basis_float n) {
char *result;
int len;
@@ -1116,15 +970,19 @@ char *uw_Basis_urlifyInt(uw_context ctx, uw_Basis_int n) {
return r;
}
-char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel n) {
- int len;
- char *r;
+char *uw_Basis_urlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client == NULL || chn.cli != ctx->client->id)
+ return "";
+ else {
+ int len;
+ char *r;
- uw_check_heap(ctx, INTS_MAX);
- r = ctx->heap.front;
- sprintf(r, "%lld%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return r;
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", chn.chn, &len);
+ ctx->heap.front += len+1;
+ return r;
+ }
}
char *uw_Basis_urlifyFloat(uw_context ctx, uw_Basis_float n) {
@@ -1182,13 +1040,15 @@ uw_unit uw_Basis_urlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
-uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel n) {
- int len;
-
- uw_check(ctx, INTS_MAX);
- sprintf(ctx->page.front, "%lld%n", (long long)n, &len);
- ctx->page.front += len;
-
+uw_unit uw_Basis_urlifyChannel_w(uw_context ctx, uw_Basis_channel chn) {
+ if (ctx->client != NULL && chn.cli == ctx->client->id) {
+ int len;
+
+ uw_check(ctx, INTS_MAX + 1);
+ sprintf(ctx->page.front, "%u%n", chn.chn, &len);
+ ctx->page.front += len;
+ }
+
return uw_unit_v;
}
@@ -1255,10 +1115,6 @@ uw_Basis_int uw_Basis_unurlifyInt(uw_context ctx, char **s) {
return r;
}
-uw_Basis_channel uw_Basis_unurlifyChannel(uw_context ctx, char **s) {
- return uw_Basis_unurlifyInt(ctx, s);
-}
-
uw_Basis_float uw_Basis_unurlifyFloat(uw_context ctx, char **s) {
char *new_s = uw_unurlify_advance(*s);
uw_Basis_float r;
@@ -1350,10 +1206,6 @@ uw_unit uw_Basis_htmlifyInt_w(uw_context ctx, uw_Basis_int n) {
return uw_unit_v;
}
-char *uw_Basis_htmlifyChannel(uw_context ctx, uw_Basis_channel ch) {
- return uw_Basis_htmlifyInt(ctx, ch);
-}
-
char *uw_Basis_htmlifyFloat(uw_context ctx, uw_Basis_float n) {
int len;
char *r;
@@ -1541,17 +1393,6 @@ char *uw_Basis_sqlifyInt(uw_context ctx, uw_Basis_int n) {
return r;
}
-char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel n) {
- int len;
- char *r;
-
- uw_check_heap(ctx, INTS_MAX + 6);
- r = ctx->heap.front;
- sprintf(r, "%lld::int4%n", (long long)n, &len);
- ctx->heap.front += len+1;
- return r;
-}
-
char *uw_Basis_sqlifyIntN(uw_context ctx, uw_Basis_int *n) {
if (n == NULL)
return "NULL";
@@ -1614,6 +1455,52 @@ uw_Basis_string uw_Basis_sqlifyString(uw_context ctx, uw_Basis_string s) {
return r;
}
+char *uw_Basis_sqlifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ int len;
+ char *r;
+ unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+ uw_check_heap(ctx, INTS_MAX + 7);
+ r = ctx->heap.front;
+ sprintf(r, "%lld::int8%n", combo, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_attrifyChannel(uw_context ctx, uw_Basis_channel chn) {
+ int len;
+ char *r;
+ unsigned long long combo = ((unsigned long long)chn.cli << 32) | chn.chn;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%lld%n", combo, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_sqlifyClient(uw_context ctx, uw_Basis_client cli) {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 7);
+ r = ctx->heap.front;
+ sprintf(r, "%u::int4%n", cli, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
+char *uw_Basis_attrifyClient(uw_context ctx, uw_Basis_client cli) {
+ int len;
+ char *r;
+
+ uw_check_heap(ctx, INTS_MAX + 1);
+ r = ctx->heap.front;
+ sprintf(r, "%u%n", cli, &len);
+ ctx->heap.front += len+1;
+ return r;
+}
+
uw_Basis_string uw_Basis_sqlifyStringN(uw_context ctx, uw_Basis_string s) {
if (s == NULL)
return "NULL";
@@ -1637,6 +1524,21 @@ char *uw_Basis_sqlifyBoolN(uw_context ctx, uw_Basis_bool *b) {
char *uw_Basis_sqlifyTime(uw_context ctx, uw_Basis_time t) {
size_t len;
+ char *r, *s;
+ struct tm stm;
+
+ if (localtime_r(&t, &stm)) {
+ s = uw_malloc(ctx, TIMES_MAX);
+ len = strftime(s, TIMES_MAX, TIME_FMT, &stm);
+ r = uw_malloc(ctx, len + 14);
+ sprintf(r, "'%s'::timestamp", s);
+ return r;
+ } else
+ return "<Invalid time>";
+}
+
+char *uw_Basis_attrifyTime(uw_context ctx, uw_Basis_time t) {
+ size_t len;
char *r;
struct tm stm;
@@ -1723,18 +1625,6 @@ uw_Basis_int *uw_Basis_stringToInt(uw_context ctx, uw_Basis_string s) {
return NULL;
}
-uw_Basis_channel *uw_Basis_stringToChannel(uw_context ctx, uw_Basis_string s) {
- char *endptr;
- uw_Basis_channel n = strtoll(s, &endptr, 10);
-
- if (*s != '\0' && *endptr == '\0') {
- uw_Basis_channel *r = uw_malloc(ctx, sizeof(uw_Basis_channel));
- *r = n;
- return r;
- } else
- return NULL;
-}
-
uw_Basis_float *uw_Basis_stringToFloat(uw_context ctx, uw_Basis_string s) {
char *endptr;
uw_Basis_float n = strtod(s, &endptr);
@@ -1802,14 +1692,27 @@ uw_Basis_int uw_Basis_stringToInt_error(uw_context ctx, uw_Basis_string s) {
uw_error(ctx, FATAL, "Can't parse int: %s", s);
}
+#include <errno.h>
+
uw_Basis_channel uw_Basis_stringToChannel_error(uw_context ctx, uw_Basis_string s) {
+ unsigned long long n;
+
+ if (sscanf(s, "%llu", &n) < 1)
+ uw_error(ctx, FATAL, "Can't parse channel: %s", s);
+ else {
+ uw_Basis_channel ch = {n >> 32, n & ((1ull << 32) - 1)};
+ return ch;
+ }
+}
+
+uw_Basis_client uw_Basis_stringToClient_error(uw_context ctx, uw_Basis_string s) {
char *endptr;
- uw_Basis_channel n = strtoll(s, &endptr, 10);
+ unsigned long n = strtoul(s, &endptr, 10);
if (*s != '\0' && *endptr == '\0')
return n;
else
- uw_error(ctx, FATAL, "Can't parse channel int: %s", s);
+ uw_error(ctx, FATAL, "Can't parse client: %s", s);
}
uw_Basis_float uw_Basis_stringToFloat_error(uw_context ctx, uw_Basis_string s) {
@@ -1856,22 +1759,6 @@ uw_Basis_time uw_Basis_stringToTime_error(uw_context ctx, uw_Basis_string s) {
}
}
-uw_Basis_string uw_Basis_requestHeader(uw_context ctx, uw_Basis_string h) {
- int len = strlen(h);
- char *s = ctx->headers, *p;
-
- while (p = strchr(s, ':')) {
- if (p - s == len && !strncasecmp(s, h, len)) {
- return p + 2;
- } else {
- if ((s = strchr(p, 0)) && s < ctx->headers_end)
- s += 2;
- else
- return NULL;
- }
- }
-}
-
uw_Basis_string uw_Basis_get_cookie(uw_context ctx, uw_Basis_string c) {
int len = strlen(c);
char *s = ctx->headers, *p = ctx->outHeaders.start;
@@ -1930,100 +1817,45 @@ uw_unit uw_Basis_set_cookie(uw_context ctx, uw_Basis_string prefix, uw_Basis_str
return uw_unit_v;
}
-static channel_delta *allocate_delta(uw_context ctx, channel *ch) {
- size_t i;
- channel_delta *cd;
+static delta *allocate_delta(uw_context ctx, unsigned client) {
+ unsigned i;
+ delta *d;
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch != ch; ++i);
+ for (i = 0; i < ctx->used_deltas; ++i)
+ if (ctx->deltas[i].client == client)
+ return &ctx->deltas[i];
- if (i < ctx->n_deltas && ctx->deltas[i].mode == USED && ctx->deltas[i].ch == ch)
- return &ctx->deltas[i];
-
- if (i < ctx->n_deltas)
- cd = &ctx->deltas[i];
- else {
- ++ctx->n_deltas;
- ctx->deltas = realloc(ctx->deltas, sizeof(channel_delta) * ctx->n_deltas);
- cd = &ctx->deltas[ctx->n_deltas-1];
- cd->n_subscribed = 0;
- cd->subscribed = malloc(0);
- buf_init(&cd->msgs, 0);
+ if (ctx->used_deltas >= ctx->n_deltas) {
+ ctx->deltas = realloc(ctx->deltas, sizeof(delta) * ++ctx->n_deltas);
+ buf_init(&ctx->deltas[ctx->n_deltas-1].msgs, 0);
}
-
- cd->mode = USED;
- cd->newness = OLD;
- cd->ch = ch;
- if (cd->n_subscribed > 0)
- cd->subscribed[0] = NULL;
- buf_reset(&cd->msgs);
- return cd;
+
+ d = &ctx->deltas[ctx->used_deltas++];
+ d->client = client;
+ buf_reset(&d->msgs);
+ return d;
}
uw_Basis_channel uw_Basis_new_channel(uw_context ctx, uw_unit u) {
- size_t i;
- channel *ch = uw_new_channel();
- ++ch->data.used.refcount;
- channel_delta *cd = allocate_delta(ctx, ch);
+ if (ctx->client == NULL)
+ uw_error(ctx, FATAL, "Attempt to create channel on request not associated with a persistent connection");
- cd->newness = NEW;
-
- return ch->id;
-}
-
-static int delta_used(channel_delta *cd) {
- return cd->newness == NEW || buf_used(&cd->msgs) > 0 || (cd->n_subscribed > 0 && cd->subscribed[0]);
+ return new_channel(ctx->client);
}
-uw_unit uw_Basis_subscribe(uw_context ctx, uw_Basis_channel chn) {
- channel *ch = uw_find_channel(chn);
-
- if (ch == NULL)
- uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
- else {
- size_t id = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Client"));
- int pass = atoi(uw_Basis_requestHeader(ctx, "UrWeb-Pass"));
- client *c = uw_find_client(id);
-
- if (c == NULL) {
- uw_release_channel(ch);
- uw_error(ctx, FATAL, "Unknown client ID in subscription request");
- } else if (c->data.used.pass != pass) {
- uw_release_channel(ch);
- uw_release_client(c);
- uw_error(ctx, FATAL, "Wrong client password (%d) in subscription request", pass);
- } else {
- size_t i;
- channel_delta *cd = allocate_delta(ctx, ch);
+uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
+ delta *d = allocate_delta(ctx, chn.cli);
+ size_t len;
+ int preLen;
+ char pre[INTS_MAX + 2];
- if (delta_used(cd))
- uw_release_channel(ch);
+ len = strlen(msg);
- for (i = 0; i < cd->n_subscribed && cd->subscribed[i]; ++i);
+ sprintf(pre, "%u\n%n", chn.chn, &preLen);
- if (i < cd->n_subscribed)
- cd->subscribed[i] = c;
- else {
- ++cd->n_subscribed;
- cd->subscribed = realloc(cd->subscribed, sizeof(int) * cd->n_subscribed);
- cd->subscribed[cd->n_subscribed-1] = c;
- }
- }
- }
-
- return uw_unit_v;
-}
-
-uw_unit uw_Basis_send(uw_context ctx, uw_Basis_channel chn, uw_Basis_string msg) {
- channel *ch = uw_find_channel(chn);
-
- if (ch == NULL)
- uw_error(ctx, FATAL, "Bad channel ID %d", (int)chn);
- else {
- channel_delta *cd = allocate_delta(ctx, ch);
- if (delta_used(cd))
- uw_release_channel(ch);
- buf_append(&cd->msgs, msg, strlen(msg));
- }
+ buf_append(&d->msgs, pre, preLen);
+ buf_append(&d->msgs, msg, len);
+ buf_append(&d->msgs, "\n", 1);
return uw_unit_v;
}
@@ -2032,46 +1864,27 @@ int uw_db_commit(uw_context);
int uw_db_rollback(uw_context);
void uw_commit(uw_context ctx) {
- size_t i, j;
+ unsigned i;
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- channel *ch = ctx->deltas[i].ch;
-
- for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
- client *c = ctx->deltas[i].subscribed[j];
+ if (uw_db_commit(ctx))
+ uw_error(ctx, FATAL, "Error running SQL COMMIT");
- uw_subscribe(ch, c);
- uw_release_client(c);
- }
+ for (i = 0; i < ctx->used_deltas; ++i) {
+ delta *d = &ctx->deltas[i];
+ client *c = find_client(d->client);
- if (buf_used(&ctx->deltas[i].msgs) > 0) {
- uw_channel_send(ch, ctx->deltas[i].msgs.start);
- }
+ assert (c != NULL && c->mode == USED);
- uw_release_channel(ch);
+ client_send(c == ctx->client, c, &d->msgs);
}
- if (uw_db_commit(ctx))
- uw_error(ctx, FATAL, "Error running SQL COMMIT");
+ if (ctx->client)
+ pthread_mutex_unlock(&ctx->client->lock);
}
int uw_rollback(uw_context ctx) {
- size_t i, j;
-
- for (i = 0; i < ctx->n_deltas && ctx->deltas[i].mode == USED; ++i) {
- channel *ch = ctx->deltas[i].ch;
-
- for (j = 0; j < ctx->deltas[i].n_subscribed && ctx->deltas[i].subscribed[j]; ++j) {
- client *c = ctx->deltas[i].subscribed[j];
-
- uw_release_client(c);
- }
-
- if (ctx->deltas[i].newness == NEW)
- uw_free_channel(ch);
- else
- uw_release_channel(ch);
- }
+ if (ctx->client)
+ pthread_mutex_unlock(&ctx->client->lock);
return uw_db_rollback(ctx);
}