From 72920cc08ac8bee1816422974468502ce43e9b7b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 10:20:17 -0800 Subject: Conversion progress --- src/core/ext/client_channel/client_channel.c | 2 +- src/core/lib/channel/compress_filter.c | 3 +- src/core/lib/channel/message_size_filter.c | 17 ++++++----- .../lib/security/transport/client_auth_filter.c | 26 ++++++++++------- .../lib/security/transport/server_auth_filter.c | 18 +++++++----- src/core/lib/surface/lame_client.c | 10 ++++--- src/core/lib/surface/server.c | 34 ++++++++++------------ src/core/lib/transport/transport_op_string.c | 26 ++++++++++------- test/core/end2end/tests/filter_causes_close.c | 7 +++-- 9 files changed, 80 insertions(+), 63 deletions(-) diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index bf64f84772..9d92bf41c5 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -449,7 +449,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { grpc_transport_op *op = arg; - grpc_channel_element *elem = op->transport_private.args[0]; + grpc_channel_element *elem = op->transport_private.extra_arg; channel_data *chand = elem->channel_data; if (op->on_connectivity_state_change != NULL) { diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 705480a0db..5321760e1a 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -210,7 +210,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); - calld->send_op->send_message = &calld->replacement_stream.base; + calld->send_op->payload->send_message.send_message = + &calld->replacement_stream.base; calld->post_send = calld->send_op->on_complete; calld->send_op->on_complete = &calld->send_done; diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index b424c0d2ac..e99b767bef 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -141,11 +141,13 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport_stream_op* op) { call_data* calld = elem->call_data; // Check max send message size. - if (op->send_message != NULL && calld->max_send_size >= 0 && - op->send_message->length > (size_t)calld->max_send_size) { + if (op->send_message && calld->max_send_size >= 0 && + op->payload->send_message.send_message->length > + (size_t)calld->max_send_size) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->send_message->length, calld->max_send_size); + op->payload->send_message.send_message->length, + calld->max_send_size); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, @@ -154,10 +156,11 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, return; } // Inject callback for receiving a message. - if (op->recv_message_ready != NULL) { - calld->next_recv_message_ready = op->recv_message_ready; - calld->recv_message = op->recv_message; - op->recv_message_ready = &calld->recv_message_ready; + if (op->payload->recv_message.recv_message_ready != NULL) { + calld->next_recv_message_ready = + op->payload->recv_message.recv_message_ready; + calld->recv_message = op->payload->recv_message.recv_message; + op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; } // Chain to the next filter. grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index a23082a866..98ef36518a 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -120,8 +120,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_initial_metadata != NULL); - mdb = op->send_initial_metadata; + GPR_ASSERT(op->send_initial_metadata); + mdb = op->payload->send_initial_metadata.send_initial_metadata; for (i = 0; i < num_md; i++) { add_error(&error, grpc_metadata_batch_add_tail( @@ -174,7 +174,9 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = - (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value; + (grpc_client_security_context *)op->payload + ->context[GRPC_CONTEXT_SECURITY] + .value; grpc_call_credentials *channel_call_creds = chand->security_connector->request_metadata_creds; int call_creds_has_md = (ctx != NULL) && (ctx->creds != NULL); @@ -248,23 +250,25 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) { + if (calld->security_context_set == 0 && !op->cancel_stream) { calld->security_context_set = 1; - GPR_ASSERT(op->context); - if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->context[GRPC_CONTEXT_SECURITY].value = + GPR_ASSERT(op->payload->context != NULL); + if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + op->payload->context[GRPC_CONTEXT_SECURITY].value = grpc_client_security_context_create(); - op->context[GRPC_CONTEXT_SECURITY].destroy = + op->payload->context[GRPC_CONTEXT_SECURITY].destroy = grpc_client_security_context_destroy; } - sec_ctx = op->context[GRPC_CONTEXT_SECURITY].value; + sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } - if (op->send_initial_metadata != NULL) { - for (l = op->send_initial_metadata->list.head; l != NULL; l = l->next) { + if (op->send_initial_metadata) { + for (l = op->payload->send_initial_metadata.send_initial_metadata->list + .head; + l != NULL; l = l->next) { grpc_mdelem md = l->md; /* Pointer comparison is OK for md_elems created from the same context. */ diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 14619d97ca..afd07d6917 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -139,9 +139,10 @@ static void on_md_processing_done( ? error_details : "Authentication metadata processing failed."; calld->transport_op->send_initial_metadata = NULL; - if (calld->transport_op->send_message != NULL) { - grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message); - calld->transport_op->send_message = NULL; + if (calld->transport_op->send_message) { + grpc_byte_stream_destroy( + &exec_ctx, calld->transport_op->payload->send_message.send_message); + calld->transport_op->send_message = false; } calld->transport_op->send_trailing_metadata = NULL; grpc_closure_sched(&exec_ctx, calld->on_done_recv, @@ -173,11 +174,14 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->auth_on_recv; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->auth_on_recv; calld->transport_op = op; } } diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 49bc4c114b..4c2c3df14f 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -84,10 +84,12 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->recv_initial_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_initial_metadata); - } else if (op->recv_trailing_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_trailing_metadata); + if (op->recv_initial_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_initial_metadata.recv_initial_metadata); + } else if (op->recv_trailing_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_trailing_metadata.recv_trailing_metadata); } grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_CREATE("lame client channel")); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index b360579553..17b3f8cbe5 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -154,8 +154,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; - bool recv_idempotent_request; - bool recv_cacheable_request; + uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; request_matcher *request_matcher; @@ -498,13 +497,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); rc->data.batch.details->deadline = calld->deadline; - rc->data.batch.details->flags = - (calld->recv_idempotent_request - ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST - : 0) | - (calld->recv_cacheable_request - ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST - : 0); + rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; @@ -632,7 +625,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!grpc_slice_eq(rm->host, calld->host)) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -649,7 +643,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (rm->has_host) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -783,13 +778,16 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(op->recv_idempotent_request == NULL); - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; - op->recv_idempotent_request = &calld->recv_idempotent_request; - op->recv_cacheable_request = &calld->recv_cacheable_request; + if (op->recv_initial_metadata) { + GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL); + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->server_on_recv_initial_metadata; + op->payload->recv_initial_metadata.recv_flags = + &calld->recv_initial_metadata_flags; } } diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 28360e3784..0ec6a6ea5c 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -81,45 +81,49 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add( &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]")); - if (op->send_initial_metadata != NULL) { + if (op->send_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); - put_metadata_list(&b, *op->send_initial_metadata); + put_metadata_list( + &b, *op->payload->send_initial_metadata.send_initial_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->send_message != NULL) { + if (op->send_message) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", - op->send_message->flags, op->send_message->length); + op->payload->send_message.send_message->flags, + op->payload->send_message.send_message->length); gpr_strvec_add(&b, tmp); } - if (op->send_trailing_metadata != NULL) { + if (op->send_trailing_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{")); - put_metadata_list(&b, *op->send_trailing_metadata); + put_metadata_list( + &b, *op->payload->send_trailing_metadata.send_trailing_metadata); gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->recv_initial_metadata != NULL) { + if (op->recv_initial_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA")); } - if (op->recv_message != NULL) { + if (op->recv_message) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); } - if (op->recv_trailing_metadata != NULL) { + if (op->recv_trailing_metadata) { gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); } - if (op->cancel_error != GRPC_ERROR_NONE) { + if (op->cancel_stream) { gpr_strvec_add(&b, gpr_strdup(" ")); - const char *msg = grpc_error_string(op->cancel_error); + const char *msg = + grpc_error_string(op->payload->cancel_stream.cancel_error); gpr_asprintf(&tmp, "CANCEL:%s", msg); gpr_strvec_add(&b, tmp); diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 25e606556d..ebc6f0a52a 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -220,9 +220,10 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { - calld->recv_im_ready = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = + if (op->recv_initial_metadata) { + calld->recv_im_ready = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = grpc_closure_create(recv_im_ready, elem, grpc_schedule_on_exec_ctx); } grpc_call_next_op(exec_ctx, elem, op); -- cgit v1.2.3