aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Adam Chlipala <adamc@hcoop.net>2008-07-19 18:56:57 -0400
committerGravatar Adam Chlipala <adamc@hcoop.net>2008-07-19 18:56:57 -0400
commit1500c4fedf82243dfbee5fff8ea392905f0a8c80 (patch)
tree9da0820b99f1c3fea1734fcc29d58f5af983ff49 /src
parent00d03f085730b4bd1d353f7841285084b35eeaf7 (diff)
Change driver to use Pthreads
Diffstat (limited to 'src')
-rw-r--r--src/c/driver.c194
-rw-r--r--src/c/lacweb.c5
-rw-r--r--src/compiler.sml2
3 files changed, 139 insertions, 62 deletions
diff --git a/src/c/driver.c b/src/c/driver.c
index 6fb3defe..f79c166d 100644
--- a/src/c/driver.c
+++ b/src/c/driver.c
@@ -5,6 +5,8 @@
#include <sys/socket.h>
#include <netinet/in.h>
+#include <pthread.h>
+
#include "lacweb.h"
int lw_port = 8080;
@@ -13,88 +15,145 @@ int lw_bufsize = 1024;
void lw_handle(lw_context, char*);
-static void worker(int sock) {
- char buf[lw_bufsize+1], *back = buf, *s;
+typedef struct node {
+ int fd;
+ struct node *next;
+} *node;
- while (1) {
- int r = recv(sock, back, lw_bufsize - (back - buf), 0);
+static node front = NULL, back = NULL;
- if (r < 0) {
- fprintf(stderr, "Recv failed\n");
- close(sock);
- return;
- }
+static int empty() {
+ return front == NULL;
+}
- if (r == 0) {
- printf("Connection closed.\n");
- close(sock);
- return;
- }
+static void enqueue(int fd) {
+ node n = malloc(sizeof(struct node));
- printf("Received %d bytes.\n", r);
+ n->fd = fd;
+ n->next = NULL;
+ if (back)
+ back->next = n;
+ else
+ front = n;
+ back = n;
+}
- back += r;
- *back = 0;
-
- if (s = strstr(buf, "\r\n\r\n")) {
- char *cmd, *path;
- lw_context ctx;
+static int dequeue() {
+ int ret = front->fd;
- *s = 0;
-
- if (!(s = strstr(buf, "\r\n"))) {
- fprintf(stderr, "No newline in buf\n");
- close(sock);
- return;
- }
+ front = front->next;
+ if (!front)
+ back = NULL;
- *s = 0;
- cmd = s = buf;
-
- if (!strsep(&s, " ")) {
- fprintf(stderr, "No first space in HTTP command\n");
- close(sock);
- return;
- }
+ return ret;
+}
+
+static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
+
+static void *worker(void *data) {
+ int me = *(int *)data;
+ lw_context ctx = lw_init(1024, 1024);
+
+ while (1) {
+ char buf[lw_bufsize+1], *back = buf, *s;
+ int sock;
+
+ pthread_mutex_lock(&queue_mutex);
+ while (empty())
+ pthread_cond_wait(&queue_cond, &queue_mutex);
+ sock = dequeue();
+ pthread_mutex_unlock(&queue_mutex);
+
+ printf("Handling connection with thread #%d.\n", me);
- if (strcmp(cmd, "GET")) {
- fprintf(stderr, "Not ready for non-get command: %s\n", cmd);
- close(sock);
- return;
+ while (1) {
+ int r = recv(sock, back, lw_bufsize - (back - buf), 0);
+
+ if (r < 0) {
+ fprintf(stderr, "Recv failed\n");
+ break;
}
- path = s;
- if (!strsep(&s, " ")) {
- fprintf(stderr, "No second space in HTTP command\n");
- close(sock);
- return;
+ if (r == 0) {
+ printf("Connection closed.\n");
+ break;
}
- printf("Serving URI %s....\n", path);
+ printf("Received %d bytes.\n", r);
- ctx = lw_init(1024, 1024);
- lw_write (ctx, "HTTP/1.1 200 OK\r\n");
- lw_write(ctx, "Content-type: text/html\r\n\r\n");
- lw_write(ctx, "<html>");
- lw_handle(ctx, path);
- lw_write(ctx, "</html>");
+ back += r;
+ *back = 0;
+
+ if (s = strstr(buf, "\r\n\r\n")) {
+ char *cmd, *path;
- lw_send(ctx, sock);
+ *s = 0;
+
+ if (!(s = strstr(buf, "\r\n"))) {
+ fprintf(stderr, "No newline in buf\n");
+ break;
+ }
- lw_free(ctx);
- printf("Done with client.\n\n");
- close(sock);
- return;
+ *s = 0;
+ cmd = s = buf;
+
+ if (!strsep(&s, " ")) {
+ fprintf(stderr, "No first space in HTTP command\n");
+ break;
+ }
+
+ if (strcmp(cmd, "GET")) {
+ fprintf(stderr, "Not ready for non-get command: %s\n", cmd);
+ break;
+ }
+
+ path = s;
+ if (!strsep(&s, " ")) {
+ fprintf(stderr, "No second space in HTTP command\n");
+ break;
+ }
+
+ printf("Serving URI %s....\n", path);
+
+ ctx = lw_init(1024, 1024);
+ lw_write (ctx, "HTTP/1.1 200 OK\r\n");
+ lw_write(ctx, "Content-type: text/html\r\n\r\n");
+ lw_write(ctx, "<html>");
+ lw_handle(ctx, path);
+ lw_write(ctx, "</html>");
+
+ lw_send(ctx, sock);
+
+ printf("Done with client.\n\n");
+ break;
+ }
}
+
+ close(sock);
+ lw_reset(ctx);
}
}
-int main() {
+int main(int argc, char *argv[]) {
// The skeleton for this function comes from Beej's sockets tutorial.
- int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
+ int sockfd; // listen on sock_fd
struct sockaddr_in my_addr;
struct sockaddr_in their_addr; // connector's address information
int sin_size, yes = 1;
+ int nthreads, i, *names;
+
+ if (argc < 2) {
+ fprintf(stderr, "No thread count specified\n");
+ return 1;
+ }
+
+ nthreads = atoi(argv[1]);
+ if (nthreads <= 0) {
+ fprintf(stderr, "Invalid thread count\n");
+ return 1;
+ }
+ names = calloc(nthreads, sizeof(int));
sockfd = socket(PF_INET, SOCK_STREAM, 0); // do some error checking!
@@ -127,8 +186,17 @@ int main() {
printf("Listening on port %d....\n", lw_port);
+ for (i = 0; i < nthreads; ++i) {
+ pthread_t thread;
+ names[i] = i;
+ if (pthread_create(&thread, NULL, worker, &names[i])) {
+ fprintf(stderr, "Error creating worker thread #%d\n", i);
+ return 1;
+ }
+ }
+
while (1) {
- new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
+ int new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
if (new_fd < 0) {
fprintf(stderr, "Socket accept failed\n");
@@ -136,6 +204,10 @@ int main() {
}
printf("Accepted connection.\n");
- worker(new_fd);
+
+ pthread_mutex_lock(&queue_mutex);
+ enqueue(new_fd);
+ pthread_mutex_unlock(&queue_mutex);
+ pthread_cond_broadcast(&queue_cond);
}
}
diff --git a/src/c/lacweb.c b/src/c/lacweb.c
index 53710e28..2b806f86 100644
--- a/src/c/lacweb.c
+++ b/src/c/lacweb.c
@@ -31,6 +31,11 @@ void lw_free(lw_context ctx) {
free(ctx);
}
+void lw_reset(lw_context ctx) {
+ ctx->page_front = ctx->page;
+ ctx->heap_front = ctx->heap;
+}
+
static void lw_check_heap(lw_context ctx, size_t extra) {
if (ctx->heap_back - ctx->heap_front < extra) {
size_t desired = ctx->heap_back - ctx->heap_front + extra, next;
diff --git a/src/compiler.sml b/src/compiler.sml
index 1e8d84f0..abf9bff5 100644
--- a/src/compiler.sml
+++ b/src/compiler.sml
@@ -432,7 +432,7 @@ fun compile job =
val ename = "/tmp/webapp"
val compile = "gcc -O3 -I include -c " ^ cname ^ " -o " ^ oname
- val link = "gcc -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename
+ val link = "gcc -pthread -O3 clib/lacweb.o " ^ oname ^ " clib/driver.o -o " ^ ename
val outf = TextIO.openOut cname
val s = TextIOPP.openOut {dst = outf, wid = 80}