diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 115 |
1 files changed, 42 insertions, 73 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 201969cd45..c769866ceb 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1,33 +1,18 @@ /* * - * Copyright 2015, Google Inc. - * All rights reserved. + * Copyright 2015 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -472,7 +457,7 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } -#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#ifndef NDEBUG #define REF_REASON reason #define REF_ARG , const char *reason #else @@ -535,7 +520,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, - grpc_closure_init(&c->release_call, release_call, c, + GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } @@ -649,7 +634,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( - grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); + GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; execute_op(exec_ctx, c, op); @@ -944,33 +929,6 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) { return algorithm; } -static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *b) { - if (b->idx.named.grpc_status != NULL) { - uint32_t status_code = decode_status(b->idx.named.grpc_status->md); - grpc_error *error = - status_code == GRPC_STATUS_OK - ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Error received from peer"), - GRPC_ERROR_INT_GRPC_STATUS, - (intptr_t)status_code); - - if (b->idx.named.grpc_message != NULL) { - error = grpc_error_set_str( - error, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); - } else if (error != GRPC_ERROR_NONE) { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, - grpc_empty_slice()); - } - - set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); - } -} - static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, int is_trailing) { if (b->list.count == 0) return; @@ -995,8 +953,6 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - recv_common_filter(exec_ctx, call, b); - if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( @@ -1004,7 +960,6 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, GPR_TIMER_END("incoming_compression_algorithm", 0); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); } - if (b->idx.named.grpc_accept_encoding != NULL) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); set_encodings_accepted_by_peer(exec_ctx, call, @@ -1012,14 +967,33 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } - publish_app_metadata(call, b, false); } static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, grpc_metadata_batch *b) { grpc_call *call = args; - recv_common_filter(exec_ctx, call, b); + if (b->idx.named.grpc_status != NULL) { + uint32_t status_code = decode_status(b->idx.named.grpc_status->md); + grpc_error *error = + status_code == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error received from peer"), + GRPC_ERROR_INT_GRPC_STATUS, + (intptr_t)status_code); + if (b->idx.named.grpc_message != NULL) { + error = grpc_error_set_str( + error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); + } else if (error != GRPC_ERROR_NONE) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + grpc_empty_slice()); + } + set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); + grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); + } publish_app_metadata(call, b, true); } @@ -1185,7 +1159,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error); + GRPC_CLOSURE_RUN(exec_ctx, bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1290,7 +1264,7 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, + GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl, grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } @@ -1405,11 +1379,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = grpc_closure_create( + grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } finish_batch_step(exec_ctx, bctl); @@ -1451,7 +1425,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); } else { - grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, notify_tag, GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; @@ -1468,7 +1442,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *stream_op = &bctl->op; grpc_transport_stream_op_batch_payload *stream_op_payload = &call->stream_op_payload; - stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { @@ -1657,14 +1630,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come - from server.c. In that case, it's coming from accept_stream, and in - that case we're not necessarily covered by a poller. */ - stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; - grpc_closure_init(&call->receiving_initial_metadata_ready, + GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); stream_op->recv_initial_metadata = true; @@ -1688,7 +1657,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; @@ -1754,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); |