summaryrefslogtreecommitdiff
path: root/src/c
diff options
context:
space:
mode:
authorGravatar Ziv Scully <ziv@mit.edu>2014-09-13 19:16:07 -0400
committerGravatar Ziv Scully <ziv@mit.edu>2014-09-13 19:16:07 -0400
commita7bfe57a2a355c5362d33e993394aa0bac300360 (patch)
tree1f81b256828f90ff34656d7d8fe703ce13d22e48 /src/c
parent6b6635f390cc072971dcc7b37af00bca21c48364 (diff)
parent5d2d4930568267b0e205ece3d4908cdc7ff715a1 (diff)
Merge.
Diffstat (limited to 'src/c')
-rw-r--r--src/c/http.c79
-rw-r--r--src/c/request.c30
-rw-r--r--src/c/urweb.c55
3 files changed, 114 insertions, 50 deletions
diff --git a/src/c/http.c b/src/c/http.c
index 32dd1dd1..9651a216 100644
--- a/src/c/http.c
+++ b/src/c/http.c
@@ -23,6 +23,9 @@ extern uw_app uw_application;
int uw_backlog = SOMAXCONN;
static int keepalive = 0, quiet = 0;
+#define qfprintf(f, fmt, args...) do { if(!quiet) fprintf(f, fmt, ##args); } while(0)
+#define qprintf(fmt, args...) do { if(!quiet) printf(fmt, ##args); } while(0)
+
static char *get_header(void *data, const char *h) {
char *s = data;
int len = strlen(h);
@@ -86,8 +89,7 @@ static void *worker(void *data) {
sock = uw_dequeue();
}
- if (!quiet)
- printf("Handling connection with thread #%d.\n", me);
+ qprintf("Handling connection with thread #%d.\n", me);
while (1) {
int r;
@@ -95,8 +97,15 @@ static void *worker(void *data) {
if (back - buf == buf_size - 1) {
char *new_buf;
- buf_size *= 2;
- new_buf = realloc(buf, buf_size);
+ size_t new_buf_size = buf_size*2;
+ new_buf = realloc(buf, new_buf_size);
+ if(!new_buf) {
+ qfprintf(stderr, "Realloc failed while receiving header\n");
+ close(sock);
+ sock = 0;
+ break;
+ }
+ buf_size = new_buf_size;
back = new_buf + (back - buf);
buf = new_buf;
}
@@ -107,16 +116,14 @@ static void *worker(void *data) {
r = recv(sock, back, buf_size - 1 - (back - buf), 0);
if (r < 0) {
- if (!quiet)
- fprintf(stderr, "Recv failed\n");
+ qfprintf(stderr, "Recv failed while receiving header, retcode %d errno %m\n", r);
close(sock);
sock = 0;
break;
}
if (r == 0) {
- if (!quiet)
- printf("Connection closed.\n");
+ qprintf("Connection closed.\n");
close(sock);
sock = 0;
break;
@@ -146,9 +153,16 @@ static void *worker(void *data) {
while (back - body < clen) {
if (back - buf == buf_size - 1) {
char *new_buf;
- buf_size *= 2;
- new_buf = realloc(buf, buf_size);
-
+ size_t new_buf_size = buf_size * 2;
+ new_buf = realloc(buf, new_buf_size);
+ if(!new_buf) {
+ qfprintf(stderr, "Realloc failed while receiving content\n");
+ close(sock);
+ sock = 0;
+ goto done;
+ }
+
+ buf_size = new_buf_size;
back = new_buf + (back - buf);
body = new_buf + (body - buf);
s = new_buf + (s - buf);
@@ -159,16 +173,14 @@ static void *worker(void *data) {
r = recv(sock, back, buf_size - 1 - (back - buf), 0);
if (r < 0) {
- if (!quiet)
- fprintf(stderr, "Recv failed\n");
+ qfprintf(stderr, "Recv failed while receiving content, retcode %d errno %m\n", r);
close(sock);
sock = 0;
goto done;
}
if (r == 0) {
- if (!quiet)
- fprintf(stderr, "Connection closed.\n");
+ qfprintf(stderr, "Connection closed.\n");
close(sock);
sock = 0;
goto done;
@@ -236,8 +248,7 @@ static void *worker(void *data) {
uw_set_headers(ctx, get_header, headers);
uw_set_env(ctx, get_env, NULL);
- if (!quiet)
- printf("Serving URI %s....\n", path);
+ qprintf("Serving URI %s....\n", path);
rr = uw_request(rc, ctx, method, path, query_string, body, back - body,
on_success, on_failure,
NULL, log_error, log_debug,
@@ -301,7 +312,7 @@ static void *worker(void *data) {
}
static void help(char *cmd) {
- printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k] [-q]\nThe '-k' option turns on HTTP keepalive.\nThe '-q' option turns off some chatter on stdout.\n", cmd);
+ printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k] [-q] [-T SEC]\nThe '-k' option turns on HTTP keepalive.\nThe '-q' option turns off some chatter on stdout.\nThe -T option sets socket recv timeout (0 disables timeout, default is 5 sec)", cmd);
}
static void sigint(int signum) {
@@ -316,6 +327,7 @@ int main(int argc, char *argv[]) {
struct sockaddr_in their_addr; // connector's address information
socklen_t sin_size;
int yes = 1, uw_port = 8080, nthreads = 1, i, *names, opt;
+ int recv_timeout_sec = 5;
signal(SIGINT, sigint);
signal(SIGPIPE, SIG_IGN);
@@ -323,7 +335,7 @@ int main(int argc, char *argv[]) {
my_addr.sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP
memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero);
- while ((opt = getopt(argc, argv, "hp:a:t:kq")) != -1) {
+ while ((opt = getopt(argc, argv, "hp:a:t:kqT:")) != -1) {
switch (opt) {
case '?':
fprintf(stderr, "Unknown command-line option\n");
@@ -364,6 +376,15 @@ int main(int argc, char *argv[]) {
keepalive = 1;
break;
+ case 'T':
+ recv_timeout_sec = atoi(optarg);
+ if (recv_timeout_sec < 0) {
+ fprintf(stderr, "Invalid recv timeout\n");
+ help(argv[0]);
+ return 1;
+ }
+ break;
+
case 'q':
quiet = 1;
break;
@@ -405,8 +426,7 @@ int main(int argc, char *argv[]) {
sin_size = sizeof their_addr;
- if (!quiet)
- printf("Listening on port %d....\n", uw_port);
+ qprintf("Listening on port %d....\n", uw_port);
{
pthread_t thread;
@@ -434,17 +454,26 @@ int main(int argc, char *argv[]) {
int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
if (new_fd < 0) {
- if (!quiet)
- fprintf(stderr, "Socket accept failed\n");
+ qfprintf(stderr, "Socket accept failed\n");
} else {
- if (!quiet)
- printf("Accepted connection.\n");
+ qprintf("Accepted connection.\n");
if (keepalive) {
int flag = 1;
setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
}
+ if(recv_timeout_sec>0) {
+ int ret;
+ struct timeval tv;
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = recv_timeout_sec;
+ ret = setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
+ if(ret != 0) {
+ qfprintf(stderr, "Timeout setting failed, errcode %d errno '%m'\n", ret);
+ }
+ }
+
uw_enqueue(new_fd);
}
}
diff --git a/src/c/request.c b/src/c/request.c
index 5aee7bbe..d621aea7 100644
--- a/src/c/request.c
+++ b/src/c/request.c
@@ -444,8 +444,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
int len = strlen(inputs);
if (len+1 > rc->queryString_size) {
+ char *qs = realloc(rc->queryString, len+1);
+ if(qs == NULL) {
+ log_error(logger_data, "queryString is too long (not enough memory)\n");
+ return FAILED;
+ }
+ rc->queryString = qs;
rc->queryString_size = len+1;
- rc->queryString = realloc(rc->queryString, len+1);
}
strcpy(rc->queryString, inputs);
@@ -480,8 +485,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
on_success(ctx);
if (path_len + 1 > rc->path_copy_size) {
+ char *pc = realloc(rc->path_copy, path_len + 1);
+ if(pc == NULL) {
+ log_error(logger_data, "Path is too long (not enough memory)\n");
+ return FAILED;
+ }
+ rc->path_copy = pc;
rc->path_copy_size = path_len + 1;
- rc->path_copy = realloc(rc->path_copy, rc->path_copy_size);
}
strcpy(rc->path_copy, path);
@@ -503,14 +513,14 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
had_error = 1;
strcpy(errmsg, uw_error_message(ctx));
} else {
+ try_rollback(ctx, 0, logger_data, log_error);
+
uw_write_header(ctx, "Content-type: text/html\r\n");
uw_write(ctx, "<html><head><title>Fatal Error</title></head><body>");
uw_write(ctx, "Fatal error: ");
uw_write(ctx, uw_error_message(ctx));
uw_write(ctx, "\n</body></html>");
- try_rollback(ctx, 0, logger_data, log_error);
-
return FAILED;
}
} else
@@ -527,14 +537,14 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
had_error = 1;
strcpy(errmsg, uw_error_message(ctx));
} else {
+ try_rollback(ctx, 0, logger_data, log_error);
+
uw_reset_keep_error_message(ctx);
on_failure(ctx);
uw_write_header(ctx, "Content-type: text/plain\r\n");
uw_write(ctx, "Fatal error (out of retries): ");
uw_write(ctx, uw_error_message(ctx));
uw_write(ctx, "\n");
-
- try_rollback(ctx, 0, logger_data, log_error);
return FAILED;
}
@@ -548,6 +558,8 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
had_error = 1;
strcpy(errmsg, uw_error_message(ctx));
} else {
+ try_rollback(ctx, 0, logger_data, log_error);
+
uw_reset_keep_error_message(ctx);
on_failure(ctx);
uw_write_header(ctx, "Content-type: text/html\r\n");
@@ -556,8 +568,6 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
uw_write(ctx, uw_error_message(ctx));
uw_write(ctx, "\n</body></html>");
- try_rollback(ctx, 0, logger_data, log_error);
-
return FAILED;
}
} else {
@@ -567,13 +577,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx,
had_error = 1;
strcpy(errmsg, "Unknown uw_handle return code");
} else {
+ try_rollback(ctx, 0, logger_data, log_error);
+
uw_reset_keep_request(ctx);
on_failure(ctx);
uw_write_header(ctx, "Content-type: text/plain\r\n");
uw_write(ctx, "Unknown uw_handle return code!\n");
- try_rollback(ctx, 0, logger_data, log_error);
-
return FAILED;
}
}
diff --git a/src/c/urweb.c b/src/c/urweb.c
index 57762da8..51ce2735 100644
--- a/src/c/urweb.c
+++ b/src/c/urweb.c
@@ -441,7 +441,7 @@ struct uw_context {
const char *script_header;
- int needs_push, needs_sig, could_write_db;
+ int needs_push, needs_sig, could_write_db, at_most_one_query;
size_t n_deltas, used_deltas;
delta *deltas;
@@ -523,6 +523,7 @@ uw_context uw_init(int id, uw_loggers *lg) {
ctx->needs_push = 0;
ctx->needs_sig = 0;
ctx->could_write_db = 1;
+ ctx->at_most_one_query = 0;
ctx->source_count = 0;
@@ -791,7 +792,7 @@ failure_kind uw_begin(uw_context ctx, char *path) {
}
void uw_ensure_transaction(uw_context ctx) {
- if (!ctx->transaction_started) {
+ if (!ctx->transaction_started && !ctx->at_most_one_query) {
if (ctx->app->db_begin(ctx, ctx->could_write_db))
uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN");
ctx->transaction_started = 1;
@@ -1048,12 +1049,12 @@ int uw_set_file_input(uw_context ctx, const char *name, uw_Basis_file f) {
int n = ctx->app->input_num(name);
if (n < 0) {
- uw_set_error(ctx, "Bad file input name %s", uw_Basis_htmlifyString(ctx, name));
+ uw_set_error(ctx, "Bad file input name");
return -1;
}
if (n >= ctx->app->inputs_len) {
- uw_set_error(ctx, "For file input name %s, index %d is out of range", uw_Basis_htmlifyString(ctx, name), n);
+ uw_set_error(ctx, "For file input name, index %d is out of range", n);
return -1;
}
@@ -1210,6 +1211,10 @@ void uw_set_could_write_db(uw_context ctx, int n) {
ctx->could_write_db = n;
}
+void uw_set_at_most_one_query(uw_context ctx, int n) {
+ ctx->at_most_one_query = n;
+}
+
static void uw_buffer_check_ctx(uw_context ctx, const char *kind, uw_buffer *b, size_t extra, const char *desc) {
if (b->back - b->front < extra) {
@@ -3317,6 +3322,8 @@ static char *find_sig(char *haystack) {
return s;
}
+static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER;
+
int uw_commit(uw_context ctx) {
int i;
char *sig;
@@ -3336,10 +3343,17 @@ int uw_commit(uw_context ctx) {
}
}
+ // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit.
+ if (ctx->used_deltas > 0)
+ pthread_mutex_lock(&message_send_mutex);
+
if (ctx->transaction_started) {
int code = ctx->app->db_commit(ctx);
if (code) {
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
+
if (ctx->client)
release_client(ctx->client);
@@ -3356,7 +3370,7 @@ int uw_commit(uw_context ctx) {
if (ctx->transactionals[i].free)
ctx->transactionals[i].free(ctx->transactionals[i].data, 1);
- return 1;
+ return 1;
}
for (i = ctx->used_transactionals-1; i >= 0; --i)
@@ -3373,16 +3387,19 @@ int uw_commit(uw_context ctx) {
if (ctx->transactionals[i].commit) {
ctx->transactionals[i].commit(ctx->transactionals[i].data);
if (uw_has_error(ctx)) {
- if (ctx->client)
- release_client(ctx->client);
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
- for (i = ctx->used_transactionals-1; i >= 0; --i)
- if (ctx->transactionals[i].rollback != NULL)
- ctx->transactionals[i].rollback(ctx->transactionals[i].data);
+ if (ctx->client)
+ release_client(ctx->client);
- for (i = ctx->used_transactionals-1; i >= 0; --i)
- if (ctx->transactionals[i].free)
- ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
+ for (i = ctx->used_transactionals-1; i >= 0; --i)
+ if (ctx->transactionals[i].rollback != NULL)
+ ctx->transactionals[i].rollback(ctx->transactionals[i].data);
+
+ for (i = ctx->used_transactionals-1; i >= 0; --i)
+ if (ctx->transactionals[i].free)
+ ctx->transactionals[i].free(ctx->transactionals[i].data, 0);
return 0;
}
@@ -3398,6 +3415,9 @@ int uw_commit(uw_context ctx) {
client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script));
}
+ if (ctx->used_deltas > 0)
+ pthread_mutex_unlock(&message_send_mutex);
+
if (ctx->client)
release_client(ctx->client);
@@ -3617,7 +3637,7 @@ uw_Basis_string uw_Basis_checkUrl(uw_context ctx, uw_Basis_string s) {
static int mime_format(const char *s) {
for (; *s; ++s)
- if (!isalnum((int)*s) && *s != '/' && *s != '-' && *s != '.')
+ if (!isalnum((int)*s) && *s != '/' && *s != '-' && *s != '.' && *s != '+')
return 0;
return 1;
@@ -3859,6 +3879,11 @@ __attribute__((noreturn)) void uw_return_blob(uw_context ctx, uw_Basis_blob b, u
longjmp(ctx->jmp_buf, RETURN_INDIRECTLY);
}
+void uw_replace_page(uw_context ctx, const char *data, size_t size) {
+ uw_buffer_reset(&ctx->page);
+ ctx_uw_buffer_append(ctx, "page", &ctx->page, data, size);
+}
+
__attribute__((noreturn)) void uw_return_blob_from_page(uw_context ctx, uw_Basis_string mimeType) {
cleanup *cl;
int len;
@@ -4269,7 +4294,7 @@ uw_Basis_bool uw_Basis_eq_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t
}
uw_Basis_bool uw_Basis_lt_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t2) {
- return !!(t1.seconds < t2.seconds || t1.microseconds < t2.microseconds);
+ return !!(t1.seconds < t2.seconds || (t1.seconds == t2.seconds && t1.microseconds < t2.microseconds));
}
uw_Basis_bool uw_Basis_le_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t2) {