diff options
Diffstat (limited to 'src/core/ext/filters/http/client/http_client_filter.cc')
-rw-r--r-- | src/core/ext/filters/http/client/http_client_filter.cc | 75 |
1 files changed, 57 insertions, 18 deletions
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb..bf9a01f659 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -37,10 +37,31 @@ #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 /* default maximum size of payload eligable for GET request */ -static const size_t kMaxPayloadSizeForGet = 2048; +static constexpr size_t kMaxPayloadSizeForGet = 2048; + +static void recv_initial_metadata_ready(void* user_data, grpc_error* error); +static void recv_trailing_metadata_ready(void* user_data, grpc_error* error); +static void on_send_message_next_done(void* arg, grpc_error* error); +static void send_message_on_complete(void* arg, grpc_error* error); namespace { struct call_data { + call_data(grpc_call_element* elem, const grpc_call_element_args& args) + : call_combiner(args.call_combiner) { + GRPC_CLOSURE_INIT(&recv_initial_metadata_ready, + ::recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready, + ::recv_trailing_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_send_message_next_done, ::on_send_message_next_done, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&send_message_on_complete, ::send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + } + + ~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); } + grpc_call_combiner* call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; @@ -51,15 +72,18 @@ struct call_data { grpc_linked_mdelem user_agent; // State for handling recv_initial_metadata ops. grpc_metadata_batch* recv_initial_metadata; - grpc_closure* original_recv_initial_metadata_ready; + grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE; + grpc_closure* original_recv_initial_metadata_ready = nullptr; grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. grpc_metadata_batch* recv_trailing_metadata; grpc_closure* original_recv_trailing_metadata_ready; grpc_closure recv_trailing_metadata_ready; + grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE; + bool seen_recv_trailing_metadata_ready = false; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; - size_t send_message_bytes_read; + size_t send_message_bytes_read = 0; grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache; grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> send_message_caching_stream; @@ -78,7 +102,12 @@ struct channel_data { static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem, grpc_metadata_batch* b) { if (b->idx.named.status != nullptr) { - if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { + /* If both gRPC status and HTTP status are provided in the response, we + * should prefer the gRPC status code, as mentioned in + * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + */ + if (b->idx.named.grpc_status != nullptr || + grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) { grpc_metadata_batch_remove(b, b->idx.named.status); } else { char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md), @@ -147,21 +176,39 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata); + calld->recv_initial_metadata_error = GRPC_ERROR_REF(error); } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); + grpc_closure* closure = calld->original_recv_initial_metadata_ready; + calld->original_recv_initial_metadata_ready = nullptr; + if (calld->seen_recv_trailing_metadata_ready) { + GRPC_CALL_COMBINER_START( + calld->call_combiner, &calld->recv_trailing_metadata_ready, + calld->recv_trailing_metadata_error, "continue recv_trailing_metadata"); + } + GRPC_CLOSURE_RUN(closure, error); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->original_recv_initial_metadata_ready != nullptr) { + calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error); + calld->seen_recv_trailing_metadata_ready = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner, + "deferring recv_trailing_metadata_ready until " + "after recv_initial_metadata_ready"); + return; + } if (error == GRPC_ERROR_NONE) { error = client_filter_incoming_metadata(elem, calld->recv_trailing_metadata); } else { GRPC_ERROR_REF(error); } + error = grpc_error_add_child( + error, GRPC_ERROR_REF(calld->recv_initial_metadata_error)); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); } @@ -416,25 +463,17 @@ done: /* Constructor for call_data */ static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = static_cast<call_data*>(elem->call_data); - calld->call_combiner = args->call_combiner; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, - elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, - on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); + new (elem->call_data) call_data(elem, *args); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, - grpc_closure* ignored) {} + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + calld->~call_data(); +} static grpc_mdelem scheme_from_args(const grpc_channel_args* args) { unsigned i; |