diff options
author | Ziv Scully <ziv@mit.edu> | 2014-09-13 19:16:07 -0400 |
---|---|---|
committer | Ziv Scully <ziv@mit.edu> | 2014-09-13 19:16:07 -0400 |
commit | a7bfe57a2a355c5362d33e993394aa0bac300360 (patch) | |
tree | 1f81b256828f90ff34656d7d8fe703ce13d22e48 /src/c | |
parent | 6b6635f390cc072971dcc7b37af00bca21c48364 (diff) | |
parent | 5d2d4930568267b0e205ece3d4908cdc7ff715a1 (diff) |
Merge.
Diffstat (limited to 'src/c')
-rw-r--r-- | src/c/http.c | 79 | ||||
-rw-r--r-- | src/c/request.c | 30 | ||||
-rw-r--r-- | src/c/urweb.c | 55 |
3 files changed, 114 insertions, 50 deletions
diff --git a/src/c/http.c b/src/c/http.c index 32dd1dd1..9651a216 100644 --- a/src/c/http.c +++ b/src/c/http.c @@ -23,6 +23,9 @@ extern uw_app uw_application; int uw_backlog = SOMAXCONN; static int keepalive = 0, quiet = 0; +#define qfprintf(f, fmt, args...) do { if(!quiet) fprintf(f, fmt, ##args); } while(0) +#define qprintf(fmt, args...) do { if(!quiet) printf(fmt, ##args); } while(0) + static char *get_header(void *data, const char *h) { char *s = data; int len = strlen(h); @@ -86,8 +89,7 @@ static void *worker(void *data) { sock = uw_dequeue(); } - if (!quiet) - printf("Handling connection with thread #%d.\n", me); + qprintf("Handling connection with thread #%d.\n", me); while (1) { int r; @@ -95,8 +97,15 @@ static void *worker(void *data) { if (back - buf == buf_size - 1) { char *new_buf; - buf_size *= 2; - new_buf = realloc(buf, buf_size); + size_t new_buf_size = buf_size*2; + new_buf = realloc(buf, new_buf_size); + if(!new_buf) { + qfprintf(stderr, "Realloc failed while receiving header\n"); + close(sock); + sock = 0; + break; + } + buf_size = new_buf_size; back = new_buf + (back - buf); buf = new_buf; } @@ -107,16 +116,14 @@ static void *worker(void *data) { r = recv(sock, back, buf_size - 1 - (back - buf), 0); if (r < 0) { - if (!quiet) - fprintf(stderr, "Recv failed\n"); + qfprintf(stderr, "Recv failed while receiving header, retcode %d errno %m\n", r); close(sock); sock = 0; break; } if (r == 0) { - if (!quiet) - printf("Connection closed.\n"); + qprintf("Connection closed.\n"); close(sock); sock = 0; break; @@ -146,9 +153,16 @@ static void *worker(void *data) { while (back - body < clen) { if (back - buf == buf_size - 1) { char *new_buf; - buf_size *= 2; - new_buf = realloc(buf, buf_size); - + size_t new_buf_size = buf_size * 2; + new_buf = realloc(buf, new_buf_size); + if(!new_buf) { + qfprintf(stderr, "Realloc failed while receiving content\n"); + close(sock); + sock = 0; + goto done; + } + + buf_size = new_buf_size; back = new_buf + (back - buf); body = new_buf + (body - buf); s = new_buf + (s - buf); @@ -159,16 +173,14 @@ static void *worker(void *data) { r = recv(sock, back, buf_size - 1 - (back - buf), 0); if (r < 0) { - if (!quiet) - fprintf(stderr, "Recv failed\n"); + qfprintf(stderr, "Recv failed while receiving content, retcode %d errno %m\n", r); close(sock); sock = 0; goto done; } if (r == 0) { - if (!quiet) - fprintf(stderr, "Connection closed.\n"); + qfprintf(stderr, "Connection closed.\n"); close(sock); sock = 0; goto done; @@ -236,8 +248,7 @@ static void *worker(void *data) { uw_set_headers(ctx, get_header, headers); uw_set_env(ctx, get_env, NULL); - if (!quiet) - printf("Serving URI %s....\n", path); + qprintf("Serving URI %s....\n", path); rr = uw_request(rc, ctx, method, path, query_string, body, back - body, on_success, on_failure, NULL, log_error, log_debug, @@ -301,7 +312,7 @@ static void *worker(void *data) { } static void help(char *cmd) { - printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k] [-q]\nThe '-k' option turns on HTTP keepalive.\nThe '-q' option turns off some chatter on stdout.\n", cmd); + printf("Usage: %s [-p <port>] [-a <IP address>] [-t <thread count>] [-k] [-q] [-T SEC]\nThe '-k' option turns on HTTP keepalive.\nThe '-q' option turns off some chatter on stdout.\nThe -T option sets socket recv timeout (0 disables timeout, default is 5 sec)", cmd); } static void sigint(int signum) { @@ -316,6 +327,7 @@ int main(int argc, char *argv[]) { struct sockaddr_in their_addr; // connector's address information socklen_t sin_size; int yes = 1, uw_port = 8080, nthreads = 1, i, *names, opt; + int recv_timeout_sec = 5; signal(SIGINT, sigint); signal(SIGPIPE, SIG_IGN); @@ -323,7 +335,7 @@ int main(int argc, char *argv[]) { my_addr.sin_addr.s_addr = INADDR_ANY; // auto-fill with my IP memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero); - while ((opt = getopt(argc, argv, "hp:a:t:kq")) != -1) { + while ((opt = getopt(argc, argv, "hp:a:t:kqT:")) != -1) { switch (opt) { case '?': fprintf(stderr, "Unknown command-line option\n"); @@ -364,6 +376,15 @@ int main(int argc, char *argv[]) { keepalive = 1; break; + case 'T': + recv_timeout_sec = atoi(optarg); + if (recv_timeout_sec < 0) { + fprintf(stderr, "Invalid recv timeout\n"); + help(argv[0]); + return 1; + } + break; + case 'q': quiet = 1; break; @@ -405,8 +426,7 @@ int main(int argc, char *argv[]) { sin_size = sizeof their_addr; - if (!quiet) - printf("Listening on port %d....\n", uw_port); + qprintf("Listening on port %d....\n", uw_port); { pthread_t thread; @@ -434,17 +454,26 @@ int main(int argc, char *argv[]) { int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); if (new_fd < 0) { - if (!quiet) - fprintf(stderr, "Socket accept failed\n"); + qfprintf(stderr, "Socket accept failed\n"); } else { - if (!quiet) - printf("Accepted connection.\n"); + qprintf("Accepted connection.\n"); if (keepalive) { int flag = 1; setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); } + if(recv_timeout_sec>0) { + int ret; + struct timeval tv; + memset(&tv, 0, sizeof(struct timeval)); + tv.tv_sec = recv_timeout_sec; + ret = setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval)); + if(ret != 0) { + qfprintf(stderr, "Timeout setting failed, errcode %d errno '%m'\n", ret); + } + } + uw_enqueue(new_fd); } } diff --git a/src/c/request.c b/src/c/request.c index 5aee7bbe..d621aea7 100644 --- a/src/c/request.c +++ b/src/c/request.c @@ -444,8 +444,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx, int len = strlen(inputs); if (len+1 > rc->queryString_size) { + char *qs = realloc(rc->queryString, len+1); + if(qs == NULL) { + log_error(logger_data, "queryString is too long (not enough memory)\n"); + return FAILED; + } + rc->queryString = qs; rc->queryString_size = len+1; - rc->queryString = realloc(rc->queryString, len+1); } strcpy(rc->queryString, inputs); @@ -480,8 +485,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx, on_success(ctx); if (path_len + 1 > rc->path_copy_size) { + char *pc = realloc(rc->path_copy, path_len + 1); + if(pc == NULL) { + log_error(logger_data, "Path is too long (not enough memory)\n"); + return FAILED; + } + rc->path_copy = pc; rc->path_copy_size = path_len + 1; - rc->path_copy = realloc(rc->path_copy, rc->path_copy_size); } strcpy(rc->path_copy, path); @@ -503,14 +513,14 @@ request_result uw_request(uw_request_context rc, uw_context ctx, had_error = 1; strcpy(errmsg, uw_error_message(ctx)); } else { + try_rollback(ctx, 0, logger_data, log_error); + uw_write_header(ctx, "Content-type: text/html\r\n"); uw_write(ctx, "<html><head><title>Fatal Error</title></head><body>"); uw_write(ctx, "Fatal error: "); uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n</body></html>"); - try_rollback(ctx, 0, logger_data, log_error); - return FAILED; } } else @@ -527,14 +537,14 @@ request_result uw_request(uw_request_context rc, uw_context ctx, had_error = 1; strcpy(errmsg, uw_error_message(ctx)); } else { + try_rollback(ctx, 0, logger_data, log_error); + uw_reset_keep_error_message(ctx); on_failure(ctx); uw_write_header(ctx, "Content-type: text/plain\r\n"); uw_write(ctx, "Fatal error (out of retries): "); uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n"); - - try_rollback(ctx, 0, logger_data, log_error); return FAILED; } @@ -548,6 +558,8 @@ request_result uw_request(uw_request_context rc, uw_context ctx, had_error = 1; strcpy(errmsg, uw_error_message(ctx)); } else { + try_rollback(ctx, 0, logger_data, log_error); + uw_reset_keep_error_message(ctx); on_failure(ctx); uw_write_header(ctx, "Content-type: text/html\r\n"); @@ -556,8 +568,6 @@ request_result uw_request(uw_request_context rc, uw_context ctx, uw_write(ctx, uw_error_message(ctx)); uw_write(ctx, "\n</body></html>"); - try_rollback(ctx, 0, logger_data, log_error); - return FAILED; } } else { @@ -567,13 +577,13 @@ request_result uw_request(uw_request_context rc, uw_context ctx, had_error = 1; strcpy(errmsg, "Unknown uw_handle return code"); } else { + try_rollback(ctx, 0, logger_data, log_error); + uw_reset_keep_request(ctx); on_failure(ctx); uw_write_header(ctx, "Content-type: text/plain\r\n"); uw_write(ctx, "Unknown uw_handle return code!\n"); - try_rollback(ctx, 0, logger_data, log_error); - return FAILED; } } diff --git a/src/c/urweb.c b/src/c/urweb.c index 57762da8..51ce2735 100644 --- a/src/c/urweb.c +++ b/src/c/urweb.c @@ -441,7 +441,7 @@ struct uw_context { const char *script_header; - int needs_push, needs_sig, could_write_db; + int needs_push, needs_sig, could_write_db, at_most_one_query; size_t n_deltas, used_deltas; delta *deltas; @@ -523,6 +523,7 @@ uw_context uw_init(int id, uw_loggers *lg) { ctx->needs_push = 0; ctx->needs_sig = 0; ctx->could_write_db = 1; + ctx->at_most_one_query = 0; ctx->source_count = 0; @@ -791,7 +792,7 @@ failure_kind uw_begin(uw_context ctx, char *path) { } void uw_ensure_transaction(uw_context ctx) { - if (!ctx->transaction_started) { + if (!ctx->transaction_started && !ctx->at_most_one_query) { if (ctx->app->db_begin(ctx, ctx->could_write_db)) uw_error(ctx, BOUNDED_RETRY, "Error running SQL BEGIN"); ctx->transaction_started = 1; @@ -1048,12 +1049,12 @@ int uw_set_file_input(uw_context ctx, const char *name, uw_Basis_file f) { int n = ctx->app->input_num(name); if (n < 0) { - uw_set_error(ctx, "Bad file input name %s", uw_Basis_htmlifyString(ctx, name)); + uw_set_error(ctx, "Bad file input name"); return -1; } if (n >= ctx->app->inputs_len) { - uw_set_error(ctx, "For file input name %s, index %d is out of range", uw_Basis_htmlifyString(ctx, name), n); + uw_set_error(ctx, "For file input name, index %d is out of range", n); return -1; } @@ -1210,6 +1211,10 @@ void uw_set_could_write_db(uw_context ctx, int n) { ctx->could_write_db = n; } +void uw_set_at_most_one_query(uw_context ctx, int n) { + ctx->at_most_one_query = n; +} + static void uw_buffer_check_ctx(uw_context ctx, const char *kind, uw_buffer *b, size_t extra, const char *desc) { if (b->back - b->front < extra) { @@ -3317,6 +3322,8 @@ static char *find_sig(char *haystack) { return s; } +static pthread_mutex_t message_send_mutex = PTHREAD_MUTEX_INITIALIZER; + int uw_commit(uw_context ctx) { int i; char *sig; @@ -3336,10 +3343,17 @@ int uw_commit(uw_context ctx) { } } + // Here's an important lock to provide the abstraction that all messages from one transaction are sent as an atomic unit. + if (ctx->used_deltas > 0) + pthread_mutex_lock(&message_send_mutex); + if (ctx->transaction_started) { int code = ctx->app->db_commit(ctx); if (code) { + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); + if (ctx->client) release_client(ctx->client); @@ -3356,7 +3370,7 @@ int uw_commit(uw_context ctx) { if (ctx->transactionals[i].free) ctx->transactionals[i].free(ctx->transactionals[i].data, 1); - return 1; + return 1; } for (i = ctx->used_transactionals-1; i >= 0; --i) @@ -3373,16 +3387,19 @@ int uw_commit(uw_context ctx) { if (ctx->transactionals[i].commit) { ctx->transactionals[i].commit(ctx->transactionals[i].data); if (uw_has_error(ctx)) { - if (ctx->client) - release_client(ctx->client); + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); - for (i = ctx->used_transactionals-1; i >= 0; --i) - if (ctx->transactionals[i].rollback != NULL) - ctx->transactionals[i].rollback(ctx->transactionals[i].data); + if (ctx->client) + release_client(ctx->client); - for (i = ctx->used_transactionals-1; i >= 0; --i) - if (ctx->transactionals[i].free) - ctx->transactionals[i].free(ctx->transactionals[i].data, 0); + for (i = ctx->used_transactionals-1; i >= 0; --i) + if (ctx->transactionals[i].rollback != NULL) + ctx->transactionals[i].rollback(ctx->transactionals[i].data); + + for (i = ctx->used_transactionals-1; i >= 0; --i) + if (ctx->transactionals[i].free) + ctx->transactionals[i].free(ctx->transactionals[i].data, 0); return 0; } @@ -3398,6 +3415,9 @@ int uw_commit(uw_context ctx) { client_send(c, &d->msgs, ctx->script.start, uw_buffer_used(&ctx->script)); } + if (ctx->used_deltas > 0) + pthread_mutex_unlock(&message_send_mutex); + if (ctx->client) release_client(ctx->client); @@ -3617,7 +3637,7 @@ uw_Basis_string uw_Basis_checkUrl(uw_context ctx, uw_Basis_string s) { static int mime_format(const char *s) { for (; *s; ++s) - if (!isalnum((int)*s) && *s != '/' && *s != '-' && *s != '.') + if (!isalnum((int)*s) && *s != '/' && *s != '-' && *s != '.' && *s != '+') return 0; return 1; @@ -3859,6 +3879,11 @@ __attribute__((noreturn)) void uw_return_blob(uw_context ctx, uw_Basis_blob b, u longjmp(ctx->jmp_buf, RETURN_INDIRECTLY); } +void uw_replace_page(uw_context ctx, const char *data, size_t size) { + uw_buffer_reset(&ctx->page); + ctx_uw_buffer_append(ctx, "page", &ctx->page, data, size); +} + __attribute__((noreturn)) void uw_return_blob_from_page(uw_context ctx, uw_Basis_string mimeType) { cleanup *cl; int len; @@ -4269,7 +4294,7 @@ uw_Basis_bool uw_Basis_eq_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t } uw_Basis_bool uw_Basis_lt_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t2) { - return !!(t1.seconds < t2.seconds || t1.microseconds < t2.microseconds); + return !!(t1.seconds < t2.seconds || (t1.seconds == t2.seconds && t1.microseconds < t2.microseconds)); } uw_Basis_bool uw_Basis_le_time(uw_context ctx, uw_Basis_time t1, uw_Basis_time t2) { |