/* * * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/arena.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch api: - initial metadata send - message send - status/close send (depending on client/server) - initial metadata recv - message recv - status/close recv (depending on client/server) */ #define MAX_CONCURRENT_BATCHES 6 #define MAX_SEND_EXTRA_METADATA_COUNT 3 /* Status data for a request can come from several sources; this enumerates them all, and acts as a priority sorting for which status to return to the application - earlier entries override later ones */ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, /* Status was created by some internal channel stack operation: must come via add_batch_error */ STATUS_FROM_CORE, /* Status was created by some surface error */ STATUS_FROM_SURFACE, /* Status came from the server sending status */ STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; typedef struct { bool is_set; grpc_error *error; } received_status; static gpr_atm pack_received_status(received_status r) { return r.is_set ? (1 | (gpr_atm)r.error) : 0; } static received_status unpack_received_status(gpr_atm atm) { return (atm & 1) == 0 ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE} : (received_status){.is_set = true, .error = (grpc_error *)(atm & ~(gpr_atm)1)}; } #define MAX_ERRORS_PER_BATCH 4 typedef struct batch_control { grpc_call *call; grpc_cq_completion cq_completion; grpc_closure finish_batch; void *notify_tag; gpr_refcount steps_to_complete; grpc_error *errors[MAX_ERRORS_PER_BATCH]; gpr_atm num_errors; uint8_t send_initial_metadata; uint8_t send_message; uint8_t send_final_op; uint8_t recv_initial_metadata; uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; /* TODO(ctiller): now that this is inlined, figure out how much of the above state can be eliminated */ grpc_transport_stream_op op; } batch_control; struct grpc_call { gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; grpc_call *parent; grpc_call *first_child; gpr_timespec start_time; /* protects first_child, and child next/prev links */ gpr_mu child_list_mu; /* client or server call */ bool is_client; /** has grpc_call_destroy been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; /** which ops are in-flight */ bool sent_initial_metadata; bool sending_message; bool sent_final_op; bool received_initial_metadata; bool receiving_message; bool requested_final_op; gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; /* have we received initial metadata */ bool has_initial_md_been_received; batch_control active_batches[MAX_CONCURRENT_BATCHES]; /* first idx: is_receiving, second idx: is_trailing */ grpc_metadata_batch metadata_batch[2][2]; /* Buffered read metadata waiting to be returned to the application. Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; /* Packed received call statuses from various sources */ gpr_atm status[STATUS_SOURCE_COUNT]; /* Call data useful used for reporting. Only valid after the call has * completed */ grpc_call_final_info final_info; /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ uint32_t encodings_accepted_by_peer; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; /* for the client, extra metadata is initial metadata; for the server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; int send_extra_metadata_count; gpr_timespec send_deadline; /** siblings: children of the same parent form a list, and this list is protected under parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; grpc_slice_buffer_stream sending_stream; grpc_byte_stream *receiving_stream; grpc_byte_buffer **receiving_buffer; grpc_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; union { struct { grpc_status_code *status; grpc_slice *status_details; } client; struct { int *cancelled; } server; } final_op; void *saved_receiving_stream_ready_bctlp; }; int grpc_call_error_trace = 0; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); static void get_final_status(grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details); static void set_status_value_directly(grpc_status_code status, void *dest); static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error); static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl); static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled); static void add_init_error(grpc_error **composite, grpc_error *new) { if (new == GRPC_ERROR_NONE) return; if (*composite == GRPC_ERROR_NONE) *composite = GRPC_ERROR_CREATE("Call creation failed"); *composite = grpc_error_add_child(*composite, new); } grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, grpc_call **out_call) { size_t i, j; grpc_error *error = GRPC_ERROR_NONE; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); gpr_arena *arena = gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); call->arena = arena; *out_call = call; gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; grpc_slice path = grpc_empty_slice(); if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { call->send_extra_metadata[i].md = args->add_initial_metadata[i]; if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) { path = grpc_slice_ref_internal( GRPC_MDVALUE(args->add_initial_metadata[i])); } } call->send_extra_metadata_count = (int)args->add_initial_metadata_count; } else { GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } for (i = 0; i < 2; i++) { for (j = 0; j < 2; j++) { call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } } gpr_timespec send_deadline = gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); if (args->parent_call != NULL) { GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); gpr_mu_lock(&args->parent_call->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( gpr_convert_clock_type(send_deadline, args->parent_call->send_deadline.clock_type), args->parent_call->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ /* TODO(ctiller): This should change to use the appropriate census start_op * call. */ if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { add_init_error(&error, GRPC_ERROR_CREATE("Census tracing propagation requested " "without Census context propagation")); } grpc_call_context_set( call, GRPC_CONTEXT_TRACING, args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { add_init_error(&error, GRPC_ERROR_CREATE("Census context propagation requested " "without Census tracing propagation")); } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); } } if (args->parent_call->first_child == NULL) { args->parent_call->first_child = call; call->sibling_next = call->sibling_prev = call; } else { call->sibling_next = args->parent_call->first_child; call->sibling_prev = args->parent_call->first_child->sibling_prev; call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; } gpr_mu_unlock(&args->parent_call->child_list_mu); } call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ grpc_call_element_args call_args = { .call_stack = CALL_STACK_FROM_CALL(call), .server_transport_data = args->server_transport_data, .context = call->context, .path = path, .start_time = call->start_time, .deadline = send_deadline, .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } if (args->cq != NULL) { GPR_ASSERT( args->pollset_set_alternative == NULL && "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); GRPC_CQ_INTERNAL_REF(args->cq, "bind"); call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); } if (args->pollset_set_alternative != NULL) { call->pollent = grpc_polling_entity_create_from_pollset_set( args->pollset_set_alternative); } if (!grpc_polling_entity_is_empty(&call->pollent)) { grpc_call_stack_set_pollset_or_pollset_set( exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } grpc_slice_unref_internal(exec_ctx, path); GPR_TIMER_END("grpc_call_create", 0); return error; } void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq) { GPR_ASSERT(cq); if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) { gpr_log(GPR_ERROR, "A pollset_set is already registered for this call."); abort(); } call->cq = cq; GRPC_CQ_INTERNAL_REF(cq, "bind"); call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); grpc_call_stack_set_pollset_or_pollset_set( exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define REF_REASON reason #define REF_ARG , const char *reason #else #define REF_REASON "" #define REF_ARG #endif void grpc_call_internal_ref(grpc_call *c REF_ARG) { GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); } void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { size_t i; int ii; grpc_call *c = call; GPR_TIMER_BEGIN("destroy_call", 0); for (i = 0; i < 2; i++) { grpc_metadata_batch_destroy( exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } gpr_mu_destroy(&c->child_list_mu); for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { c->context[i].destroy(c->context[i].value); } } if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } get_final_status(call, set_status_value_directly, &c->final_info.final_status, NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { GRPC_ERROR_UNREF( unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, grpc_closure_init(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } void grpc_call_destroy(grpc_call *c) { int cancel; grpc_call *parent = c->parent; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_destroy", 0); GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { gpr_mu_lock(&parent->child_list_mu); if (c == parent->first_child) { parent->first_child = c->sibling_next; if (c == parent->first_child) { parent->first_child = NULL; } c->sibling_prev->sibling_next = c->sibling_next; c->sibling_next->sibling_prev = c->sibling_prev; } gpr_mu_unlock(&parent->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_destroy", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CALL_OK; } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op) { grpc_call_element *elem; GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); op->context = call->context; elem->filter->start_transport_stream_op(exec_ctx, elem, op); GPR_TIMER_END("execute_op", 0); } char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; char *result; GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); result = elem->filter->get_peer(&exec_ctx, elem); if (result == NULL) { result = grpc_channel_get_target(call->channel); } if (result == NULL) { result = gpr_strdup("unknown"); } grpc_exec_ctx_finish(&exec_ctx); return result; } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } /******************************************************************************* * CANCELLATION */ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE( "grpc_call_cancel_with_status(" "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, description); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CALL_OK; } static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); grpc_transport_stream_op *op = grpc_make_transport_stream_op( grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_error = error; execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, const char *description) { return grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE(description), GRPC_ERROR_STR_GRPC_MESSAGE, description), GRPC_ERROR_INT_GRPC_STATUS, status); } static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description) { cancel_with_error(exec_ctx, c, source, error_from_status(status, description)); } /******************************************************************************* * FINAL STATUS CODE MANIPULATION */ static bool get_final_status_from( grpc_call *call, grpc_error *error, bool allow_ok_status, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; const char *msg = NULL; grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } set_value(code, set_value_user_data); if (details != NULL) { *details = msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); } return true; } static void get_final_status(grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { int i; received_status status[STATUS_SOURCE_COUNT]; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); } if (grpc_call_error_trace) { gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error)); } } } /* first search through ignoring "OK" statuses: if something went wrong, * ensure we report it */ for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) { /* search for the best status we can present: ideally the error we use has a clearly defined grpc-status, and we'll prefer that. */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set && grpc_error_has_clear_grpc_status(status[i].error)) { if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } } } /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } } } } /* If nothing exists, set some default */ if (call->is_client) { set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); } else { set_value(GRPC_STATUS_OK, set_value_user_data); } } static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { if (!gpr_atm_rel_cas(&call->status[source], pack_received_status((received_status){ .is_set = false, .error = GRPC_ERROR_NONE}), pack_received_status((received_status){ .is_set = true, .error = error}))) { GRPC_ERROR_UNREF(error); } } /******************************************************************************* * COMPRESSION */ static void set_incoming_compression_algorithm( grpc_call *call, grpc_compression_algorithm algo) { GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->incoming_compression_algorithm = algo; } grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; algorithm = call->incoming_compression_algorithm; return algorithm; } static grpc_compression_algorithm compression_algorithm_for_level_locked( grpc_call *call, grpc_compression_level level) { return grpc_compression_algorithm_for_level(level, call->encodings_accepted_by_peer); } uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; flags = call->test_only_last_message_flags; return flags; } static void destroy_encodings_accepted_by_peer(void *p) { return; } static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_mdelem mdel) { size_t i; grpc_compression_algorithm algorithm; grpc_slice_buffer accept_encoding_parts; grpc_slice accept_encoding_slice; void *accepted_user_data; accepted_user_data = grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer); if (accepted_user_data != NULL) { call->encodings_accepted_by_peer = (uint32_t)(((uintptr_t)accepted_user_data) - 1); return; } accept_encoding_slice = GRPC_MDVALUE(mdel); grpc_slice_buffer_init(&accept_encoding_parts); grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already * zeroes the whole grpc_call */ /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); for (i = 0; i < accept_encoding_parts.count; i++) { grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i]; if (grpc_compression_algorithm_parse(accept_encoding_entry_slice, &algorithm)) { GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); } else { char *accept_encoding_entry_str = grpc_slice_to_c_string(accept_encoding_entry_slice); gpr_log(GPR_ERROR, "Invalid entry in accept encoding metadata: '%s'. Ignoring.", accept_encoding_entry_str); gpr_free(accept_encoding_entry_str); } } grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); } uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; encodings_accepted_by_peer = call->encodings_accepted_by_peer; return encodings_accepted_by_peer; } static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } static grpc_metadata *get_md_elem(grpc_metadata *metadata, grpc_metadata *additional_metadata, int i, int count) { grpc_metadata *res = i < count ? &metadata[i] : &additional_metadata[i - count]; GPR_ASSERT(res); return res; } static int prepare_application_metadata( grpc_exec_ctx *exec_ctx, grpc_call *call, int count, grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata, grpc_metadata *additional_metadata, int additional_metadata_count) { int total_count = count + additional_metadata_count; int i; grpc_metadata_batch *batch = &call->metadata_batch[0 /* is_receiving */][is_trailing]; for (i = 0; i < total_count; i++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { break; } else if (!grpc_is_binary_header(md->key) && !GRPC_LOG_IF_ERROR( "validate_metadata", grpc_validate_header_nonbin_value_is_legal(md->value))) { break; } l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md); } if (i != total_count) { for (int j = 0; j < i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GRPC_MDELEM_UNREF(exec_ctx, l->md); } return 0; } if (prepend_extra_metadata) { if (call->send_extra_metadata_count == 0) { prepend_extra_metadata = 0; } else { for (i = 0; i < call->send_extra_metadata_count; i++) { GRPC_LOG_IF_ERROR("prepare_application_metadata", grpc_metadata_batch_link_tail( exec_ctx, batch, &call->send_extra_metadata[i])); } } } for (i = 0; i < total_count; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); GRPC_LOG_IF_ERROR( "prepare_application_metadata", grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md))); } call->send_extra_metadata_count = 0; return 1; } /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ #define STATUS_OFFSET 1 static void destroy_status(void *ignored) {} static uint32_t decode_status(grpc_mdelem md) { uint32_t status; void *user_data; if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0; if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1; if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2; user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data != NULL) { status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET; } else { if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(md), &status)) { status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ } grpc_mdelem_set_user_data(md, destroy_status, (void *)(intptr_t)(status + STATUS_OFFSET)); } return status; } static grpc_compression_algorithm decode_compression(grpc_mdelem md) { grpc_compression_algorithm algorithm = grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md)); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Invalid incoming compression algorithm: '%s'. Interpreting " "incoming data as uncompressed.", md_c_str); gpr_free(md_c_str); return GRPC_COMPRESS_NONE; } return algorithm; } static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { if (b->idx.named.grpc_status != NULL) { uint32_t status_code = decode_status(b->idx.named.grpc_status->md); grpc_error *error = status_code == GRPC_STATUS_OK ? GRPC_ERROR_NONE : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"), GRPC_ERROR_INT_GRPC_STATUS, (intptr_t)status_code); if (b->idx.named.grpc_message != NULL) { char *msg = grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md)); error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); gpr_free(msg); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); } else if (error != GRPC_ERROR_NONE) { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, ""); } set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); } } static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, int is_trailing) { if (b->list.count == 0) return; GPR_TIMER_BEGIN("publish_app_metadata", 0); grpc_metadata_array *dest; grpc_metadata *mdusr; dest = call->buffered_metadata[is_trailing]; if (dest->count + b->list.count > dest->capacity) { dest->capacity = GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2); dest->metadata = gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); } for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) { mdusr = &dest->metadata[dest->count++]; /* we pass back borrowed slices that are valid whilst the call is valid */ mdusr->key = GRPC_MDKEY(l->md); mdusr->value = GRPC_MDVALUE(l->md); } GPR_TIMER_END("publish_app_metadata", 0); } static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { recv_common_filter(exec_ctx, call, b); if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( call, decode_compression(b->idx.named.grpc_encoding->md)); GPR_TIMER_END("incoming_compression_algorithm", 0); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); } if (b->idx.named.grpc_accept_encoding != NULL) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); set_encodings_accepted_by_peer(exec_ctx, call, b->idx.named.grpc_accept_encoding->md); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } publish_app_metadata(call, b, false); } static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, grpc_metadata_batch *b) { grpc_call *call = args; recv_common_filter(exec_ctx, call, b); publish_app_metadata(call, b, true); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } /******************************************************************************* * BATCH API IMPLEMENTATION */ static void set_status_value_directly(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = status; } static void set_cancelled_value(grpc_status_code status, void *dest) { *(int *)dest = (status != GRPC_STATUS_OK); } static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); const uint32_t invalid_positions = ~allowed_write_positions; return !(flags & invalid_positions); } static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; if (!is_client) { invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; } return !(flags & invalid_positions); } static int batch_slot_for_op(grpc_op_type type) { switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: return 0; case GRPC_OP_SEND_MESSAGE: return 1; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: case GRPC_OP_SEND_STATUS_FROM_SERVER: return 2; case GRPC_OP_RECV_INITIAL_METADATA: return 3; case GRPC_OP_RECV_MESSAGE: return 4; case GRPC_OP_RECV_CLOSE_ON_SERVER: case GRPC_OP_RECV_STATUS_ON_CLIENT: return 5; } GPR_UNREACHABLE_CODE(return 123456789); } static batch_control *allocate_batch_control(grpc_call *call, const grpc_op *ops, size_t num_ops) { int slot = batch_slot_for_op(ops[0].op); for (size_t i = 1; i < num_ops; i++) { int op_slot = batch_slot_for_op(ops[i].op); slot = GPR_MIN(slot, op_slot); } batch_control *bctl = &call->active_batches[slot]; if (bctl->call != NULL) { return NULL; } memset(bctl, 0, sizeof(*bctl)); bctl->call = call; return bctl; } static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, grpc_cq_completion *storage) { batch_control *bctl = user_data; grpc_call *call = bctl->call; bctl->call = NULL; GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } static grpc_error *consolidate_batch_errors(batch_control *bctl) { size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors); if (n == 0) { return GRPC_ERROR_NONE; } else if (n == 1) { /* Skip creating a composite error in the case that only one error was logged */ grpc_error *e = bctl->errors[0]; bctl->errors[0] = NULL; return e; } else { grpc_error *error = GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n); for (size_t i = 0; i < n; i++) { GRPC_ERROR_UNREF(bctl->errors[i]); bctl->errors[i] = NULL; } return error; } } static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *child_call; grpc_call *next_child_call; grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); if (bctl->send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->send_message) { call->sending_message = false; } if (bctl->send_final_op) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { do { next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; } while (child_call != call->first_child); } gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, call->final_op.client.status_details); } else { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, NULL); } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ bctl->call = NULL; grpc_closure_run(exec_ctx, bctl->notify_tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error, finish_batch_completion, bctl, &bctl->cq_completion); } } static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } } static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; finish_batch_step(exec_ctx, bctl); return; } if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, &call->receiving_slice, remaining, &call->receiving_slice_ready)) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); } else { return; } } } static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); continue_receiving_slices(exec_ctx, bctl); } else { if (grpc_trace_operation_failures) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; finish_batch_step(exec_ctx, bctl); } } static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) { *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( NULL, 0, call->incoming_compression_algorithm); } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } } static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; } add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true); cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; } } static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; /* validate call->incoming_compression_algorithm */ if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { const grpc_compression_algorithm algo = call->incoming_compression_algorithm; char *error_msg = NULL; const grpc_compression_options compression_options = grpc_channel_compression_options(call->channel); /* check if algorithm is known */ if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) { gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ char *algo_name = NULL; grpc_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else { call->incoming_compression_algorithm = algo; } gpr_free(error_msg); } /* make sure the received grpc-encoding is amongst the ones listed in * grpc-accept-encoding */ GPR_ASSERT(call->encodings_accepted_by_peer != 0); if (!GPR_BITGET(call->encodings_accepted_by_peer, call->incoming_compression_algorithm)) { extern int grpc_compression_trace; if (grpc_compression_trace) { char *algo_name = NULL; grpc_compression_algorithm_name(call->incoming_compression_algorithm, &algo_name); gpr_log(GPR_ERROR, "Compression algorithm (grpc-encoding = '%s') not present in " "the bitset of accepted encodings (grpc-accept-encodings: " "'0x%x')", algo_name, call->encodings_accepted_by_peer); } } } static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1); if (idx == 0 && !has_cancelled) { cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); } bctl->errors[idx] = error; } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; recv_initial_filter(exec_ctx, call, md); /* TODO(ctiller): this could be moved into recv_initial_filter now */ GPR_TIMER_BEGIN("validate_filtered_metadata", 0); validate_filtered_metadata(exec_ctx, bctl); GPR_TIMER_END("validate_filtered_metadata", 0); if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { call->send_deadline = gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); } } call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } finish_batch_step(exec_ctx, bctl); } static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p, grpc_cq_completion *completion) { gpr_free(completion); } static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { size_t i; const grpc_op *op; batch_control *bctl; int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; // sent_initial_metadata guards against variable reuse. grpc_metadata compression_md; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); if (nops == 0) { if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); } else { grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; } bctl = allocate_batch_control(call, ops, nops); if (bctl == NULL) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { op = &ops[i]; if (op->reserved != NULL) { error = GRPC_CALL_ERROR; goto done_with_error; } switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (call->sent_initial_metadata) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } /* process compression level */ memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level; bool level_set = false; if (op->data.send_initial_metadata.maybe_compression_level.is_set) { effective_compression_level = op->data.send_initial_metadata.maybe_compression_level.level; level_set = true; } else { const grpc_compression_options copts = grpc_channel_compression_options(call->channel); level_set = copts.default_level.is_set; if (level_set) { effective_compression_level = copts.default_level.level; } } if (level_set && !call->is_client) { const grpc_compression_algorithm calgo = compression_algorithm_for_level_locked( call, effective_compression_level); // the following will be picked up by the compress filter and used as // the call's compression algorithm. compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; compression_md.value = grpc_compression_algorithm_slice(calgo); additional_metadata_count++; } if (op->data.send_initial_metadata.count + additional_metadata_count > INT_MAX) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } bctl->send_initial_metadata = 1; call->sent_initial_metadata = 1; if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, &compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; stream_op->send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op->send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (op->data.send_message.send_message == NULL) { error = GRPC_CALL_ERROR_INVALID_MESSAGE; goto done_with_error; } if (call->sending_message) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } bctl->send_message = 1; call->sending_message = 1; grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message.send_message->data.raw.slice_buffer, op->flags); /* If the outgoing buffer is already compressed, mark it as so in the flags. These will be picked up by the compression filter and further (wasteful) attempts at compression skipped. */ if (op->data.send_message.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } stream_op->send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } if (call->sent_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } bctl->send_final_op = 1; call->sent_final_op = 1; stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } if (call->sent_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } if (op->data.send_status_from_server.trailing_metadata_count > INT_MAX) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } bctl->send_final_op = 1; call->sent_final_op = 1; GPR_ASSERT(call->send_extra_metadata_count == 0); call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( exec_ctx, call->channel, op->data.send_status_from_server.status); { grpc_error *override_error = GRPC_ERROR_NONE; if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { override_error = GRPC_ERROR_CREATE("Error from server send status"); } if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_slices( exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, grpc_slice_ref_internal( *op->data.send_status_from_server.status_details)); call->send_extra_metadata_count++; char *msg = grpc_slice_to_c_string( GRPC_MDVALUE(call->send_extra_metadata[1].md)); override_error = grpc_error_set_str( override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); gpr_free(msg); } set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, override_error); } if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_status_from_server.trailing_metadata_count, op->data.send_status_from_server.trailing_metadata, 1, 1, NULL, 0)) { for (int n = 0; n < call->send_extra_metadata_count; n++) { GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md); } call->send_extra_metadata_count = 0; error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (call->received_initial_metadata) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come from server.c. In that case, it's coming from accept_stream, and in that case we're not necessarily covered by a poller. */ stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); bctl->recv_initial_metadata = 1; stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; stream_op->recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (call->receiving_message) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } call->receiving_message = 1; bctl->recv_message = 1; call->receiving_buffer = op->data.recv_message.recv_message; stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (!call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } call->requested_final_op = 1; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; call->final_op.client.status_details = op->data.recv_status_on_client.status_details; bctl->recv_final_op = 1; stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; stream_op->collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (call->is_client) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } call->requested_final_op = 1; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; stream_op->collect_stats = &call->final_info.stats.transport_stream_stats; break; } } GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); stream_op->context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_op(exec_ctx, call, stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); return error; done_with_error: /* reverse any mutations that occured */ if (bctl->send_initial_metadata) { call->sent_initial_metadata = 0; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); } if (bctl->send_message) { call->sending_message = 0; grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } if (bctl->send_final_op) { call->sent_final_op = 0; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); } if (bctl->recv_initial_metadata) { call->received_initial_metadata = 0; } if (bctl->recv_message) { call->receiving_message = 0; } if (bctl->recv_final_op) { call->requested_final_op = 0; } goto done; } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call_error err; GRPC_API_TRACE( "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != NULL) { err = GRPC_CALL_ERROR; } else { err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0); } grpc_exec_ctx_finish(&exec_ctx); return err; } grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, grpc_closure *closure) { return call_start_batch(exec_ctx, call, ops, nops, closure, 1); } void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value, void (*destroy)(void *value)) { if (call->context[elem].destroy) { call->context[elem].destroy(call->context[elem].value); } call->context[elem].value = value; call->context[elem].destroy = destroy; } void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) { return call->context[elem].value; } uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level) { grpc_compression_algorithm algo = compression_algorithm_for_level_locked(call, level); return algo; } const char *grpc_call_error_to_string(grpc_call_error error) { switch (error) { case GRPC_CALL_ERROR: return "GRPC_CALL_ERROR"; case GRPC_CALL_ERROR_ALREADY_ACCEPTED: return "GRPC_CALL_ERROR_ALREADY_ACCEPTED"; case GRPC_CALL_ERROR_ALREADY_FINISHED: return "GRPC_CALL_ERROR_ALREADY_FINISHED"; case GRPC_CALL_ERROR_ALREADY_INVOKED: return "GRPC_CALL_ERROR_ALREADY_INVOKED"; case GRPC_CALL_ERROR_BATCH_TOO_BIG: return "GRPC_CALL_ERROR_BATCH_TOO_BIG"; case GRPC_CALL_ERROR_INVALID_FLAGS: return "GRPC_CALL_ERROR_INVALID_FLAGS"; case GRPC_CALL_ERROR_INVALID_MESSAGE: return "GRPC_CALL_ERROR_INVALID_MESSAGE"; case GRPC_CALL_ERROR_INVALID_METADATA: return "GRPC_CALL_ERROR_INVALID_METADATA"; case GRPC_CALL_ERROR_NOT_INVOKED: return "GRPC_CALL_ERROR_NOT_INVOKED"; case GRPC_CALL_ERROR_NOT_ON_CLIENT: return "GRPC_CALL_ERROR_NOT_ON_CLIENT"; case GRPC_CALL_ERROR_NOT_ON_SERVER: return "GRPC_CALL_ERROR_NOT_ON_SERVER"; case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE: return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE"; case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH: return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW"); }