aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-02 15:21:47 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-03-02 15:21:47 -0800
commit1df3a60c43a5f21c30c80703e4eff2ded8bb192c (patch)
treec5c44f7cb285ed0d265cd9f4677c0d2d0e4e8f2c /src/core/lib/channel
parent759965c324b295b1de7deaba61a68f118d93a5b5 (diff)
Continue conversion
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/http_client_filter.c132
1 files changed, 77 insertions, 55 deletions
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index c031533dd8..25691c316c 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -63,7 +63,7 @@ typedef struct call_data {
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
- grpc_transport_stream_op send_op;
+ grpc_transport_stream_op *send_op;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
@@ -219,9 +219,9 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
- &calld->incoming_slice, ~(size_t)0,
- &calld->got_slice)) {
+ while (grpc_byte_stream_next(
+ exec_ctx, calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@@ -242,40 +242,47 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
/* Pass down the original send_message op that was blocked.*/
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op.send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op.on_complete;
- calld->send_op.on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ calld->send_op->payload->send_message.send_message =
+ &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
} else {
continue_send_message(exec_ctx, elem);
}
}
-static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+typedef struct hc_mutate_op_result {
+ grpc_error *error;
+ bool op_stalled;
+} hc_mutate_op_result;
+
+static hc_mutate_op_result hc_mutate_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- grpc_error *error;
+ hc_mutate_op_result result = {.error = GRPC_ERROR_NONE, .op_stalled = false};
- if (op->send_initial_metadata != NULL) {
+ if (op->send_initial_metadata) {
/* Decide which HTTP VERB to use. We use GET if the request is marked
cacheable, and the operation contains both initial metadata and send
message, and the payload is below the size threshold, and all the data
for this request is immediately available. */
grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- if ((op->send_initial_metadata_flags &
+ if (op->send_message &&
+ (op->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- op->send_message != NULL &&
- op->send_message->length < channeld->max_payload_size_for_get) {
+ op->payload->send_message.send_message->length <
+ channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET;
/* The following write to calld->send_message_blocked isn't racy with
reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
being here means ops->send_message is not NULL, which is primarily
guarding the read there. */
calld->send_message_blocked = true;
- } else if (op->send_initial_metadata_flags &
+ } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
method = GRPC_MDELEM_METHOD_PUT;
}
@@ -283,25 +290,28 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
/* Attempt to read the data from send_message and create a header field. */
if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
/* allocate memory to hold the entire payload */
- calld->payload_bytes = gpr_malloc(op->send_message->length);
+ calld->payload_bytes =
+ gpr_malloc(op->payload->send_message.send_message->length);
/* read slices of send_message and copy into payload_bytes */
- calld->send_op = *op;
- calld->send_length = op->send_message->length;
- calld->send_flags = op->send_message->flags;
+ calld->send_op = op;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
+ result.op_stalled = true;
if (calld->send_message_blocked == false) {
/* when all the send_message data is available, then create a MDELEM and
append to headers */
grpc_mdelem payload_bin = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_GRPC_PAYLOAD_BIN,
- grpc_slice_from_copied_buffer((const char *)calld->payload_bytes,
- op->send_message->length));
- error =
- grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata,
- &calld->payload_bin, payload_bin);
- if (error != GRPC_ERROR_NONE) return error;
+ grpc_slice_from_copied_buffer(
+ (const char *)calld->payload_bytes,
+ op->payload->send_message.send_message->length));
+ result.error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->payload_bin, payload_bin);
+ if (result.error != GRPC_ERROR_NONE) return result;
calld->on_complete = op->on_complete;
op->on_complete = &calld->hc_on_complete;
op->send_message = NULL;
@@ -314,42 +324,54 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
}
}
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_METHOD);
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_SCHEME);
- remove_if_present(exec_ctx, op->send_initial_metadata, GRPC_BATCH_TE);
- remove_if_present(exec_ctx, op->send_initial_metadata,
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_METHOD);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_SCHEME);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_TE);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_CONTENT_TYPE);
- remove_if_present(exec_ctx, op->send_initial_metadata,
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_USER_AGENT);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- error = grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata,
- &calld->method, method);
- if (error != GRPC_ERROR_NONE) return error;
- error =
- grpc_metadata_batch_add_head(exec_ctx, op->send_initial_metadata,
- &calld->scheme, channeld->static_scheme);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata,
- &calld->te_trailers,
- GRPC_MDELEM_TE_TRAILERS);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(
- exec_ctx, op->send_initial_metadata, &calld->content_type,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(exec_ctx, op->send_initial_metadata,
- &calld->user_agent,
- GRPC_MDELEM_REF(channeld->user_agent));
- if (error != GRPC_ERROR_NONE) return error;
+ result.error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->method, method);
+ if (result.error != GRPC_ERROR_NONE) return result;
+ result.error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->scheme, channeld->static_scheme);
+ if (result.error != GRPC_ERROR_NONE) return result;
+ result.error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
+ if (result.error != GRPC_ERROR_NONE) return result;
+ result.error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
+ if (result.error != GRPC_ERROR_NONE) return result;
+ result.error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
+ if (result.error != GRPC_ERROR_NONE) return result;
}
- if (op->recv_initial_metadata != NULL) {
+ if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
- calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->hc_on_recv_initial_metadata;
}
if (op->recv_trailing_metadata != NULL) {