diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 132 |
1 files changed, 77 insertions, 55 deletions
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..25691c316c 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -63,7 +63,7 @@ typedef struct call_data { uint8_t *payload_bytes; /* Vars to read data off of send_message */ - grpc_transport_stream_op send_op; + grpc_transport_stream_op *send_op; uint32_t send_length; uint32_t send_flags; grpc_slice incoming_slice; @@ -219,9 +219,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, - &calld->incoming_slice, ~(size_t)0, - &calld->got_slice)) { + while (grpc_byte_stream_next( + exec_ctx, calld->send_op->payload->send_message.send_message, + &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -242,40 +242,47 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { /* Pass down the original send_message op that was blocked.*/ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); - calld->send_op.send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op.on_complete; - calld->send_op.on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, &calld->send_op); + calld->send_op->payload->send_message.send_message = + &calld->replacement_stream.base; + calld->post_send = calld->send_op->on_complete; + calld->send_op->on_complete = &calld->send_done; + grpc_call_next_op(exec_ctx, elem, calld->send_op); } else { continue_send_message(exec_ctx, elem); } } -static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +typedef struct hc_mutate_op_result { + grpc_error *error; + bool op_stalled; +} hc_mutate_op_result; + +static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - grpc_error *error; + hc_mutate_op_result result = {.error = GRPC_ERROR_NONE, .op_stalled = false}; - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { /* Decide which HTTP VERB to use. We use GET if the request is marked cacheable, and the operation contains both initial metadata and send message, and the payload is below the size threshold, and all the data for this request is immediately available. */ grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if ((op->send_initial_metadata_flags & + if (op->send_message && + (op->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->send_message != NULL && - op->send_message->length < channeld->max_payload_size_for_get) { + op->payload->send_message.send_message->length < + channeld->max_payload_size_for_get) { method = GRPC_MDELEM_METHOD_GET; /* The following write to calld->send_message_blocked isn't racy with reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because being here means ops->send_message is not NULL, which is primarily guarding the read there. */ calld->send_message_blocked = true; - } else if (op->send_initial_metadata_flags & + } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { method = GRPC_MDELEM_METHOD_PUT; } @@ -283,25 +290,28 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, /* Attempt to read the data from send_message and create a header field. */ if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { /* allocate memory to hold the entire payload */ - calld->payload_bytes = gpr_malloc(op->send_message->length); + calld->payload_bytes = + gpr_malloc(op->payload->send_message.send_message->length); /* read slices of send_message and copy into payload_bytes */ - calld->send_op = *op; - calld->send_length = op->send_message->length; - calld->send_flags = op->send_message->flags; + calld->send_op = op; + calld->send_length = op->payload->send_message.send_message->length; + calld->send_flags = op->payload->send_message.send_message->flags; continue_send_message(exec_ctx, elem); + result.op_stalled = true; if (calld->send_message_blocked == false) { /* when all the send_message data is available, then create a MDELEM and append to headers */ grpc_mdelem payload_bin = grpc_mdelem_from_slices( exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN, - grpc_slice_from_copied_buffer((const char *)calld->payload_bytes, - op->send_message->length)); - error = - grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->payload_bin, payload_bin); - if (error != GRPC_ERROR_NONE) return error; + grpc_slice_from_copied_buffer( + (const char *)calld->payload_bytes, + op->payload->send_message.send_message->length)); + result.error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->payload_bin, payload_bin); + if (result.error != GRPC_ERROR_NONE) return result; calld->on_complete = op->on_complete; op->on_complete = &calld->hc_on_complete; op->send_message = NULL; @@ -314,42 +324,54 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, } } - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_TE); - remove_if_present(exec_ctx, op->send_initial_metadata, + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_METHOD); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_SCHEME); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_TE); + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, op->send_initial_metadata, + remove_if_present(exec_ctx, + op->payload->send_initial_metadata.send_initial_metadata, GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ - error = grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata, - &calld->method, method); - if (error != GRPC_ERROR_NONE) return error; - error = - grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata, - &calld->scheme, channeld->static_scheme); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->te_trailers, - GRPC_MDELEM_TE_TRAILERS); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail( - exec_ctx, op->send_initial_metadata, &calld->content_type, - GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (error != GRPC_ERROR_NONE) return error; - error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata, - &calld->user_agent, - GRPC_MDELEM_REF(channeld->user_agent)); - if (error != GRPC_ERROR_NONE) return error; + result.error = grpc_metadata_batch_add_head( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->method, method); + if (result.error != GRPC_ERROR_NONE) return result; + result.error = grpc_metadata_batch_add_head( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->scheme, channeld->static_scheme); + if (result.error != GRPC_ERROR_NONE) return result; + result.error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); + if (result.error != GRPC_ERROR_NONE) return result; + result.error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); + if (result.error != GRPC_ERROR_NONE) return result; + result.error = grpc_metadata_batch_add_tail( + exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); + if (result.error != GRPC_ERROR_NONE) return result; } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->hc_on_recv_initial_metadata; } if (op->recv_trailing_metadata != NULL) { |