diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 95 |
1 files changed, 50 insertions, 45 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6581bbd3d1..fa12b6ea61 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -142,22 +142,23 @@ struct grpc_call { gpr_mu mu; /* client or server call */ - uint8_t is_client; + bool is_client; /* is the alarm set */ - uint8_t have_alarm; + bool have_alarm; /** has grpc_call_destroy been called */ - uint8_t destroy_called; + bool destroy_called; /** flag indicating that cancellation is inherited */ - uint8_t cancellation_is_inherited; + bool cancellation_is_inherited; /** bitmask of live batches */ uint8_t used_batches; /** which ops are in-flight */ - uint8_t sent_initial_metadata; - uint8_t sending_message; - uint8_t sent_final_op; - uint8_t received_initial_metadata; - uint8_t receiving_message; - uint8_t received_final_op; + bool sent_initial_metadata; + bool sending_message; + bool sent_final_op; + bool received_initial_metadata; + bool receiving_message; + bool requested_final_op; + bool received_final_op; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -220,10 +221,7 @@ struct grpc_call { } server; } final_op; - struct { - void *bctlp; - bool success; - } saved_receiving_stream_ready_ctx; + void *saved_receiving_stream_ready_bctlp; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -554,21 +552,6 @@ static int prepare_application_metadata(grpc_call *call, int count, int i; grpc_metadata_batch *batch = &call->metadata_batch[0 /* is_receiving */][is_trailing]; - if (prepend_extra_metadata) { - if (call->send_extra_metadata_count == 0) { - prepend_extra_metadata = 0; - } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_MDELEM_REF(call->send_extra_metadata[i].md); - } - for (i = 1; i < call->send_extra_metadata_count; i++) { - call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; - } - for (i = 0; i < call->send_extra_metadata_count - 1; i++) { - call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; - } - } - } for (i = 0; i < count; i++) { grpc_metadata *md = &metadata[i]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; @@ -579,14 +562,37 @@ static int prepare_application_metadata(grpc_call *call, int count, GRPC_MDSTR_LENGTH(l->md->key))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); - return 0; + break; } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key), GRPC_MDSTR_LENGTH(l->md->key)) && !grpc_header_nonbin_value_is_legal( grpc_mdstr_as_c_string(l->md->value), GRPC_MDSTR_LENGTH(l->md->value))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); - return 0; + break; + } + } + if (i != count) { + for (int j = 0; j <= i; j++) { + grpc_metadata *md = &metadata[j]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GRPC_MDELEM_UNREF(l->md); + } + return 0; + } + if (prepend_extra_metadata) { + if (call->send_extra_metadata_count == 0) { + prepend_extra_metadata = 0; + } else { + for (i = 0; i < call->send_extra_metadata_count; i++) { + GRPC_MDELEM_REF(call->send_extra_metadata[i].md); + } + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; + } + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; + } } } for (i = 1; i < count; i++) { @@ -1057,12 +1063,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); - if (bctl->call->has_initial_md_been_received) { + if (bctl->call->has_initial_md_been_received || !success || + call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp, success); } else { - call->saved_receiving_stream_ready_ctx.bctlp = bctlp; - call->saved_receiving_stream_ready_ctx.success = success; + call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); } } @@ -1091,13 +1097,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); - grpc_exec_ctx_enqueue( - exec_ctx, saved_rsr_closure, - call->saved_receiving_stream_ready_ctx.success && success, NULL); - call->saved_receiving_stream_ready_ctx.bctlp = NULL; + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + call->saved_receiving_stream_ready_bctlp = NULL; + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL); } gpr_mu_unlock(&call->mu); @@ -1133,6 +1137,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_trailing_filter, call); + call->received_final_op = true; if (call->have_alarm) { grpc_timer_cancel(exec_ctx, &call->alarm); } @@ -1377,11 +1382,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; @@ -1404,11 +1409,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; @@ -1457,7 +1462,7 @@ done_with_error: call->receiving_message = 0; } if (bctl->recv_final_op) { - call->received_final_op = 0; + call->requested_final_op = 0; } gpr_mu_unlock(&call->mu); goto done; |