diff options
author | Adam Chlipala <adamc@hcoop.net> | 2008-07-19 18:56:57 -0400 |
---|---|---|
committer | Adam Chlipala <adamc@hcoop.net> | 2008-07-19 18:56:57 -0400 |
commit | 1500c4fedf82243dfbee5fff8ea392905f0a8c80 (patch) | |
tree | 9da0820b99f1c3fea1734fcc29d58f5af983ff49 /src/c/driver.c | |
parent | 00d03f085730b4bd1d353f7841285084b35eeaf7 (diff) |
Change driver to use Pthreads
Diffstat (limited to 'src/c/driver.c')
-rw-r--r-- | src/c/driver.c | 194 |
1 files changed, 133 insertions, 61 deletions
diff --git a/src/c/driver.c b/src/c/driver.c index 6fb3defe..f79c166d 100644 --- a/src/c/driver.c +++ b/src/c/driver.c @@ -5,6 +5,8 @@ #include <sys/socket.h> #include <netinet/in.h> +#include <pthread.h> + #include "lacweb.h" int lw_port = 8080; @@ -13,88 +15,145 @@ int lw_bufsize = 1024; void lw_handle(lw_context, char*); -static void worker(int sock) { - char buf[lw_bufsize+1], *back = buf, *s; +typedef struct node { + int fd; + struct node *next; +} *node; - while (1) { - int r = recv(sock, back, lw_bufsize - (back - buf), 0); +static node front = NULL, back = NULL; - if (r < 0) { - fprintf(stderr, "Recv failed\n"); - close(sock); - return; - } +static int empty() { + return front == NULL; +} - if (r == 0) { - printf("Connection closed.\n"); - close(sock); - return; - } +static void enqueue(int fd) { + node n = malloc(sizeof(struct node)); - printf("Received %d bytes.\n", r); + n->fd = fd; + n->next = NULL; + if (back) + back->next = n; + else + front = n; + back = n; +} - back += r; - *back = 0; - - if (s = strstr(buf, "\r\n\r\n")) { - char *cmd, *path; - lw_context ctx; +static int dequeue() { + int ret = front->fd; - *s = 0; - - if (!(s = strstr(buf, "\r\n"))) { - fprintf(stderr, "No newline in buf\n"); - close(sock); - return; - } + front = front->next; + if (!front) + back = NULL; - *s = 0; - cmd = s = buf; - - if (!strsep(&s, " ")) { - fprintf(stderr, "No first space in HTTP command\n"); - close(sock); - return; - } + return ret; +} + +static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; + +static void *worker(void *data) { + int me = *(int *)data; + lw_context ctx = lw_init(1024, 1024); + + while (1) { + char buf[lw_bufsize+1], *back = buf, *s; + int sock; + + pthread_mutex_lock(&queue_mutex); + while (empty()) + pthread_cond_wait(&queue_cond, &queue_mutex); + sock = dequeue(); + pthread_mutex_unlock(&queue_mutex); + + printf("Handling connection with thread #%d.\n", me); - if (strcmp(cmd, "GET")) { - fprintf(stderr, "Not ready for non-get command: %s\n", cmd); - close(sock); - return; + while (1) { + int r = recv(sock, back, lw_bufsize - (back - buf), 0); + + if (r < 0) { + fprintf(stderr, "Recv failed\n"); + break; } - path = s; - if (!strsep(&s, " ")) { - fprintf(stderr, "No second space in HTTP command\n"); - close(sock); - return; + if (r == 0) { + printf("Connection closed.\n"); + break; } - printf("Serving URI %s....\n", path); + printf("Received %d bytes.\n", r); - ctx = lw_init(1024, 1024); - lw_write (ctx, "HTTP/1.1 200 OK\r\n"); - lw_write(ctx, "Content-type: text/html\r\n\r\n"); - lw_write(ctx, "<html>"); - lw_handle(ctx, path); - lw_write(ctx, "</html>"); + back += r; + *back = 0; + + if (s = strstr(buf, "\r\n\r\n")) { + char *cmd, *path; - lw_send(ctx, sock); + *s = 0; + + if (!(s = strstr(buf, "\r\n"))) { + fprintf(stderr, "No newline in buf\n"); + break; + } - lw_free(ctx); - printf("Done with client.\n\n"); - close(sock); - return; + *s = 0; + cmd = s = buf; + + if (!strsep(&s, " ")) { + fprintf(stderr, "No first space in HTTP command\n"); + break; + } + + if (strcmp(cmd, "GET")) { + fprintf(stderr, "Not ready for non-get command: %s\n", cmd); + break; + } + + path = s; + if (!strsep(&s, " ")) { + fprintf(stderr, "No second space in HTTP command\n"); + break; + } + + printf("Serving URI %s....\n", path); + + ctx = lw_init(1024, 1024); + lw_write (ctx, "HTTP/1.1 200 OK\r\n"); + lw_write(ctx, "Content-type: text/html\r\n\r\n"); + lw_write(ctx, "<html>"); + lw_handle(ctx, path); + lw_write(ctx, "</html>"); + + lw_send(ctx, sock); + + printf("Done with client.\n\n"); + break; + } } + + close(sock); + lw_reset(ctx); } } -int main() { +int main(int argc, char *argv[]) { // The skeleton for this function comes from Beej's sockets tutorial. - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + int sockfd; // listen on sock_fd struct sockaddr_in my_addr; struct sockaddr_in their_addr; // connector's address information int sin_size, yes = 1; + int nthreads, i, *names; + + if (argc < 2) { + fprintf(stderr, "No thread count specified\n"); + return 1; + } + + nthreads = atoi(argv[1]); + if (nthreads <= 0) { + fprintf(stderr, "Invalid thread count\n"); + return 1; + } + names = calloc(nthreads, sizeof(int)); sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking! @@ -127,8 +186,17 @@ int main() { printf("Listening on port %d....\n", lw_port); + for (i = 0; i < nthreads; ++i) { + pthread_t thread; + names[i] = i; + if (pthread_create(&thread, NULL, worker, &names[i])) { + fprintf(stderr, "Error creating worker thread #%d\n", i); + return 1; + } + } + while (1) { - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); if (new_fd < 0) { fprintf(stderr, "Socket accept failed\n"); @@ -136,6 +204,10 @@ int main() { } printf("Accepted connection.\n"); - worker(new_fd); + + pthread_mutex_lock(&queue_mutex); + enqueue(new_fd); + pthread_mutex_unlock(&queue_mutex); + pthread_cond_broadcast(&queue_cond); } } |