diff options
Diffstat (limited to 'src/core/httpcli')
-rw-r--r-- | src/core/httpcli/httpcli.c | 95 |
1 files changed, 52 insertions, 43 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 9012070e8e..1e38479eb1 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -61,6 +61,10 @@ typedef struct { grpc_httpcli_context *context; grpc_pollset *pollset; grpc_iomgr_object iomgr_obj; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; + grpc_iomgr_closure on_read; + grpc_iomgr_closure done_write; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) { gpr_slice_unref(req->request_text); gpr_free(req->host); grpc_iomgr_unregister_object(&req->iomgr_obj); + gpr_slice_buffer_destroy(&req->incoming); + gpr_slice_buffer_destroy(&req->outgoing); gpr_free(req); } -static void on_read(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status status) { +static void on_read(void *user_data, int success); + +static void do_read(internal_request *req) { + switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) { + case GRPC_ENDPOINT_DONE: + on_read(req, 1); + break; + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_ERROR: + on_read(req, 0); + break; + } +} + +static void on_read(void *user_data, int success) { internal_request *req = user_data; size_t i; - for (i = 0; i < nslices; i++) { - if (GPR_SLICE_LENGTH(slices[i])) { + for (i = 0; i < req->incoming.count; i++) { + if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; - if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) { + if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) { finish(req, 0); - goto done; + return; } } } - switch (status) { - case GRPC_ENDPOINT_CB_OK: - grpc_endpoint_notify_on_read(req->ep, on_read, req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - case GRPC_ENDPOINT_CB_SHUTDOWN: - if (!req->have_read_byte) { - next_address(req); - } else { - finish(req, grpc_httpcli_parser_eof(&req->parser)); - } - break; - } - -done: - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); + if (success) { + do_read(req); + } else if (!req->have_read_byte) { + next_address(req); + } else { + finish(req, grpc_httpcli_parser_eof(&req->parser)); } } -static void on_written(internal_request *req) { - grpc_endpoint_notify_on_read(req->ep, on_read, req); -} +static void on_written(internal_request *req) { do_read(req); } -static void done_write(void *arg, grpc_endpoint_cb_status status) { +static void done_write(void *arg, int success) { internal_request *req = arg; - switch (status) { - case GRPC_ENDPOINT_CB_OK: - on_written(req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_ERROR: - next_address(req); - break; + if (success) { + on_written(req); + } else { + next_address(req); } } static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); - switch ( - grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { - case GRPC_ENDPOINT_WRITE_DONE: + gpr_slice_buffer_add(&req->outgoing, req->request_text); + switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) { + case GRPC_ENDPOINT_DONE: on_written(req); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: finish(req, 0); break; } @@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); @@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; - gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); + gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); req->host = gpr_strdup(request->host); |