diff options
Diffstat (limited to 'src')
68 files changed, 967 insertions, 643 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index e19726efb3..30660cb83d 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -20,7 +20,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H #include "src/core/ext/filters/client_channel/subchannel.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 1708d81e61..4596f90745 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -69,11 +69,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = (call_data*)elem->call_data; // Get stats object from context and take a ref. GPR_ASSERT(args->context != nullptr); - GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr); - calld->client_stats = grpc_grpclb_client_stats_ref( - (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value); - // Record call started. - grpc_grpclb_client_stats_add_call_started(calld->client_stats); + if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { + calld->client_stats = grpc_grpclb_client_stats_ref( + (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS] + .value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + } return GRPC_ERROR_NONE; } @@ -81,36 +83,40 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - // Record call finished, optionally setting client_failed_to_send and - // received. - grpc_grpclb_client_stats_add_call_finished( - !calld->send_initial_metadata_succeeded /* client_failed_to_send */, - calld->recv_initial_metadata_succeeded /* known_received */, - calld->client_stats); - // All done, so unref the stats object. - grpc_grpclb_client_stats_unref(calld->client_stats); + if (calld->client_stats != nullptr) { + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); + } } static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); - // Intercept send_initial_metadata. - if (batch->send_initial_metadata) { - calld->original_on_complete_for_send = batch->on_complete; - GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld, - grpc_schedule_on_exec_ctx); - batch->on_complete = &calld->on_complete_for_send; - } - // Intercept recv_initial_metadata. - if (batch->recv_initial_metadata) { - calld->original_recv_initial_metadata_ready = - batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, calld, - grpc_schedule_on_exec_ctx); - batch->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->recv_initial_metadata_ready; + if (calld->client_stats != nullptr) { + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, + calld, grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } } // Chain to next filter. grpc_call_next_op(elem, batch); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6c29cd8218..1709e5622e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -106,8 +106,8 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -169,25 +169,78 @@ struct pending_ping { } // namespace -struct glb_lb_policy { - /** base policy: must be first */ +typedef struct glb_lb_call_data { + struct glb_lb_policy* glb_policy; + // TODO(juanlishen): c++ize this struct. + gpr_refcount refs; + + /** The streaming call to the LB server. Always non-NULL. */ + grpc_call* lb_call; + + /** The initial metadata received from the LB server. */ + grpc_metadata_array lb_initial_metadata_recv; + + /** The message sent to the LB server. It's used to query for backends (the + * value may vary if the LB server indicates a redirect) or send client load + * report. */ + grpc_byte_buffer* send_message_payload; + /** The callback after the initial request is sent. */ + grpc_closure lb_on_sent_initial_request; + + /** The response received from the LB server, if any. */ + grpc_byte_buffer* recv_message_payload; + /** The callback to process the response received from the LB server. */ + grpc_closure lb_on_response_received; + bool seen_initial_response; + + /** The callback to process the status received from the LB server, which + * signals the end of the LB call. */ + grpc_closure lb_on_server_status_received; + /** The trailing metadata from the LB server. */ + grpc_metadata_array lb_trailing_metadata_recv; + /** The call status code and details. */ + grpc_status_code lb_call_status; + grpc_slice lb_call_status_details; + + /** The stats for client-side load reporting associated with this LB call. + * Created after the first serverlist is received. */ + grpc_grpclb_client_stats* client_stats; + /** The interval and timer for next client load report. */ + grpc_millis client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_callback_pending; + bool last_client_load_report_counters_were_zero; + bool client_load_report_is_due; + /** The closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; +} glb_lb_call_data; + +typedef struct glb_lb_policy { + /** Base policy: must be first. */ grpc_lb_policy base; - /** who the client is trying to communicate with */ + /** Who the client is trying to communicate with. */ const char* server_name; + + /** Channel related data that will be propagated to the internal RR policy. */ grpc_client_channel_factory* cc_factory; grpc_channel_args* args; - /** timeout in milliseconds for the LB call. 0 means no deadline. */ - int lb_call_timeout_ms; - - /** timeout in milliseconds for before using fallback backend addresses. + /** Timeout in milliseconds for before using fallback backend addresses. * 0 means not using fallback. */ int lb_fallback_timeout_ms; - /** for communicating with the LB server */ + /** The channel for communicating with the LB server. */ grpc_channel* lb_channel; + /** The data associated with the current LB call. It holds a ref to this LB + * policy. It's initialized every time we query for backends. It's reset to + * NULL whenever the current LB call is no longer needed (e.g., the LB policy + * is shutting down, or the LB call has ended). A non-NULL lb_calld always + * contains a non-NULL lb_call. */ + glb_lb_call_data* lb_calld; + /** response generator to inject address updates into \a lb_channel */ grpc_fake_resolver_response_generator* response_generator; @@ -225,9 +278,6 @@ struct glb_lb_policy { bool shutting_down; - /** are we currently updating lb_call? */ - bool updating_lb_call; - /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; @@ -243,65 +293,70 @@ struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* Finished sending initial request. */ - grpc_closure lb_on_sent_initial_request; - - /* Status from the LB server has been received. This signals the end of the LB - * call. */ - grpc_closure lb_on_server_status_received; - - /* A response from the LB server has been received. Process it */ - grpc_closure lb_on_response_received; - - /* LB call retry timer callback. */ - grpc_closure lb_on_call_retry; - - /* LB fallback timer callback. */ - grpc_closure lb_on_fallback; - - grpc_call* lb_call; /* streaming call to the LB server, */ - - grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array - lb_trailing_metadata_recv; /* trailing MD from LB server */ - - /* what's being sent to the LB server. Note that its value may vary if the LB - * server indicates a redirect. */ - grpc_byte_buffer* lb_request_payload; - - /* response the LB server, if any. Processed in lb_on_response_received() */ - grpc_byte_buffer* lb_response_payload; - - /* call status code and details, set in lb_on_server_status_received() */ - grpc_status_code lb_call_status; - grpc_slice lb_call_status_details; /** LB call retry backoff state */ grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff; + /** timeout in milliseconds for the LB call. 0 means no deadline. */ + int lb_call_timeout_ms; + /** LB call retry timer */ grpc_timer lb_call_retry_timer; + /** LB call retry timer callback */ + grpc_closure lb_on_call_retry; /** LB fallback timer */ grpc_timer lb_fallback_timer; + /** LB fallback timer callback */ + grpc_closure lb_on_fallback; +} glb_lb_policy; - bool initial_request_sent; - bool seen_initial_response; +static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld, + const char* reason) { + gpr_ref_non_zero(&lb_calld->refs); + if (grpc_lb_glb_trace.enabled()) { + const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); + gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)", + grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, + (unsigned long)(count - 1), (unsigned long)count, reason); + } +} - /* Stats for client-side load reporting. Should be unreffed and - * recreated whenever lb_call is replaced. */ - grpc_grpclb_client_stats* client_stats; - /* Interval and timer for next client load report. */ - grpc_millis client_stats_report_interval; - grpc_timer client_load_report_timer; - bool client_load_report_timer_callback_pending; - bool last_client_load_report_counters_were_zero; - /* Closure used for either the load report timer or the callback for - * completion of sending the load report. */ - grpc_closure client_load_report_closure; - /* Client load report message payload. */ - grpc_byte_buffer* client_load_report_payload; -}; +static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld, + const char* reason) { + const bool done = gpr_unref(&lb_calld->refs); + if (grpc_lb_glb_trace.enabled()) { + const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); + gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)", + grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, + (unsigned long)(count + 1), (unsigned long)count, reason); + } + if (done) { + GPR_ASSERT(lb_calld->lb_call != nullptr); + grpc_call_unref(lb_calld->lb_call); + grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv); + grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv); + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload); + grpc_slice_unref_internal(lb_calld->lb_call_status_details); + if (lb_calld->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(lb_calld->client_stats); + } + GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld"); + gpr_free(lb_calld); + } +} + +static void lb_call_data_shutdown(glb_lb_policy* glb_policy) { + GPR_ASSERT(glb_policy->lb_calld != nullptr); + GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); + // lb_on_server_status_received will complete the cancellation and clean up. + grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr); + if (glb_policy->lb_calld->client_load_report_timer_callback_pending) { + grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer); + } + glb_policy->lb_calld = nullptr; +} /* add lb_token of selected subchannel (address) to the call's initial * metadata */ @@ -334,11 +389,12 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) { abort(); } // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(pp->client_stats != nullptr); - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = - pp->client_stats; - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = - destroy_client_stats; + if (pp->client_stats != nullptr) { + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats; + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + destroy_client_stats; + } } else { if (pp->client_stats != nullptr) { grpc_grpclb_client_stats_unref(pp->client_stats); @@ -605,9 +661,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - GPR_ASSERT(glb_policy->client_stats != nullptr); - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, glb_policy->client_stats); + if (glb_policy->lb_calld != nullptr && + glb_policy->lb_calld->client_stats != nullptr) { + grpc_grpclb_client_stats_add_call_dropped_locked( + server->load_balance_token, glb_policy->lb_calld->client_stats); + } if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); gpr_free(pp); @@ -618,7 +676,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, } } // Set client_stats and user_data. - pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); + if (glb_policy->lb_calld != nullptr && + glb_policy->lb_calld->client_stats != nullptr) { + pp->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats); + } GPR_ASSERT(pp->pick->user_data == nullptr); pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. @@ -872,9 +934,6 @@ static void glb_destroy(grpc_lb_policy* pol) { GPR_ASSERT(glb_policy->pending_pings == nullptr); gpr_free((void*)glb_policy->server_name); grpc_channel_args_destroy(glb_policy->args); - if (glb_policy->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(glb_policy->client_stats); - } grpc_connectivity_state_destroy(&glb_policy->state_tracker); if (glb_policy->serverlist != nullptr) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); @@ -892,13 +951,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; - /* glb_policy->lb_call and this local lb_call must be consistent at this point - * because glb_policy->lb_call is only assigned in lb_call_init_locked as part - * of query_for_backends_locked, which can only be invoked while - * glb_policy->shutting_down is false. */ - if (glb_policy->lb_call != nullptr) { - grpc_call_cancel(glb_policy->lb_call, nullptr); - /* lb_on_server_status_received will pick up the cancel and clean up */ + if (glb_policy->lb_calld != nullptr) { + lb_call_data_shutdown(glb_policy); } if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); @@ -1048,7 +1102,6 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } - glb_policy->started_picking = true; glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(glb_policy); @@ -1089,7 +1142,6 @@ static int glb_pick_locked(grpc_lb_policy* pol, gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } - GPR_ASSERT(glb_policy->client_stats != nullptr); pick_done = pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); } @@ -1139,8 +1191,8 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_policy->retry_timer_callback_pending = false; - if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && - error == GRPC_ERROR_NONE) { + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE && + glb_policy->lb_calld == nullptr) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); } @@ -1149,84 +1201,55 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); } -static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { - if (glb_policy->started_picking && glb_policy->updating_lb_call) { - if (glb_policy->retry_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - } - if (!glb_policy->shutting_down) start_picking_locked(glb_policy); - glb_policy->updating_lb_call = false; - } else if (!glb_policy->shutting_down) { - /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", +static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) { + grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", + glb_policy); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); + if (timeout > 0) { + gpr_log(GPR_DEBUG, + "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + glb_policy, timeout); + } else { + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", glb_policy); - grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); - if (timeout > 0) { - gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.", - glb_policy, timeout); - } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.", - glb_policy); - } } - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, - lb_call_on_retry_timer_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->retry_timer_callback_pending = true; - grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, - &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_server_status_received_locked"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + glb_policy->retry_timer_callback_pending = true; + grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, + &glb_policy->lb_on_call_retry); } -static void send_client_load_report_locked(void* arg, grpc_error* error); +static void maybe_send_client_load_report_locked(void* arg, grpc_error* error); -static void schedule_next_client_load_report(glb_lb_policy* glb_policy) { +static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) { const grpc_millis next_client_load_report_time = - grpc_core::ExecCtx::Get()->Now() + - glb_policy->client_stats_report_interval; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_timer_init(&glb_policy->client_load_report_timer, + grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval; + GRPC_CLOSURE_INIT( + &lb_calld->client_load_report_closure, + maybe_send_client_load_report_locked, lb_calld, + grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner)); + grpc_timer_init(&lb_calld->client_load_report_timer, next_client_load_report_time, - &glb_policy->client_load_report_closure); + &lb_calld->client_load_report_closure); + lb_calld->client_load_report_timer_callback_pending = true; } static void client_load_report_done_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); - glb_policy->client_load_report_payload = nullptr; - if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); - if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(glb_policy); - } + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + lb_calld->send_message_payload = nullptr; + if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { + glb_lb_call_data_unref(lb_calld, "client_load_report"); return; } - schedule_next_client_load_report(glb_policy); -} - -static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) { - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = glb_policy->client_load_report_payload; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); - if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); - } + schedule_next_client_load_report(lb_calld); } static bool load_report_counters_are_zero(grpc_grpclb_request* request) { @@ -1241,341 +1264,377 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { (drop_entries == nullptr || drop_entries->num_entries == 0); } -static void send_client_load_report_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); - if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(glb_policy); - } - return; - } +static void send_client_load_report_locked(glb_lb_call_data* lb_calld) { + glb_lb_policy* glb_policy = lb_calld->glb_policy; // Construct message payload. - GPR_ASSERT(glb_policy->client_load_report_payload == nullptr); + GPR_ASSERT(lb_calld->send_message_payload == nullptr); grpc_grpclb_request* request = - grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats); + grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (load_report_counters_are_zero(request)) { - if (glb_policy->last_client_load_report_counters_were_zero) { + if (lb_calld->last_client_load_report_counters_were_zero) { grpc_grpclb_request_destroy(request); - schedule_next_client_load_report(glb_policy); + schedule_next_client_load_report(lb_calld); return; } - glb_policy->last_client_load_report_counters_were_zero = true; + lb_calld->last_client_load_report_counters_were_zero = true; } else { - glb_policy->last_client_load_report_counters_were_zero = false; + lb_calld->last_client_load_report_counters_were_zero = false; } grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - glb_policy->client_load_report_payload = + lb_calld->send_message_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); + // Send the report. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = lb_calld->send_message_payload; + GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure, + client_load_report_done_locked, lb_calld, + grpc_combiner_scheduler(glb_policy->base.combiner)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure); + if (call_error != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + +static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) { + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + lb_calld->client_load_report_timer_callback_pending = false; + if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { + glb_lb_call_data_unref(lb_calld, "client_load_report"); + return; + } // If we've already sent the initial request, then we can go ahead and send // the load report. Otherwise, we need to wait until the initial request has - // been sent to send this (see lb_on_sent_initial_request_locked() below). - if (glb_policy->initial_request_sent) { - do_send_client_load_report_locked(glb_policy); + // been sent to send this (see lb_on_sent_initial_request_locked()). + if (lb_calld->send_message_payload == nullptr) { + send_client_load_report_locked(lb_calld); + } else { + lb_calld->client_load_report_is_due = true; } } static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error); static void lb_on_server_status_received_locked(void* arg, grpc_error* error); static void lb_on_response_received_locked(void* arg, grpc_error* error); -static void lb_call_init_locked(glb_lb_policy* glb_policy) { +static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) { + GPR_ASSERT(!glb_policy->shutting_down); + // Init the LB call. Note that the LB call will progress every time there's + // activity in glb_policy->base.interested_parties, which is comprised of the + // polling entities from client_channel. GPR_ASSERT(glb_policy->server_name != nullptr); GPR_ASSERT(glb_policy->server_name[0] != '\0'); - GPR_ASSERT(glb_policy->lb_call == nullptr); - GPR_ASSERT(!glb_policy->shutting_down); - - /* Note the following LB call progresses every time there's activity in \a - * glb_policy->base.interested_parties, which is comprised of the polling - * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; - glb_policy->lb_call = grpc_channel_create_pollset_set_call( + glb_lb_call_data* lb_calld = (glb_lb_call_data*)gpr_zalloc(sizeof(*lb_calld)); + lb_calld->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, &host, deadline, nullptr); grpc_slice_unref_internal(host); - - if (glb_policy->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(glb_policy->client_stats); - } - glb_policy->client_stats = grpc_grpclb_client_stats_create(); - - grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); - grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); - + // Init the LB call request payload. grpc_grpclb_request* request = grpc_grpclb_request_create(glb_policy->server_name); grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - glb_policy->lb_request_payload = + lb_calld->send_message_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); - - GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, - lb_on_sent_initial_request_locked, glb_policy, + // Init other data associated with the LB call. + lb_calld->glb_policy = glb_policy; + gpr_ref_init(&lb_calld->refs, 1); + grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv); + grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv); + GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, - lb_on_server_status_received_locked, glb_policy, + GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received, + lb_on_response_received_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received, - lb_on_response_received_locked, glb_policy, + GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received, + lb_on_server_status_received_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - - grpc_core::BackOff::Options backoff_options; - backoff_options - .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) - .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) - .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - - glb_policy->lb_call_backoff.Init(backoff_options); - - glb_policy->initial_request_sent = false; - glb_policy->seen_initial_response = false; - glb_policy->last_client_load_report_counters_were_zero = false; -} - -static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { - GPR_ASSERT(glb_policy->lb_call != nullptr); - grpc_call_unref(glb_policy->lb_call); - glb_policy->lb_call = nullptr; - - grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); - grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); - - grpc_byte_buffer_destroy(glb_policy->lb_request_payload); - grpc_slice_unref_internal(glb_policy->lb_call_status_details); - - if (glb_policy->client_load_report_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->client_load_report_timer); - } + // Hold a ref to the glb_policy. + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld"); + return lb_calld; } /* * Auxiliary functions and LB client callbacks. */ + static void query_for_backends_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(glb_policy->lb_channel != nullptr); if (glb_policy->shutting_down) return; - - lb_call_init_locked(glb_policy); - + // Init the LB call data. + GPR_ASSERT(glb_policy->lb_calld == nullptr); + glb_policy->lb_calld = lb_call_data_create_locked(glb_policy); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)", - glb_policy, glb_policy->lb_channel, glb_policy->lb_call); + "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, " + "lb_call: %p)", + glb_policy, glb_policy->lb_channel, glb_policy->lb_calld, + glb_policy->lb_calld->lb_call); } - GPR_ASSERT(glb_policy->lb_call != nullptr); - + GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); + // Create the ops. grpc_call_error call_error; grpc_op ops[3]; memset(ops, 0, sizeof(ops)); - + // Op: send initial metadata. grpc_op* op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->flags = 0; op->reserved = nullptr; op++; + // Op: send request message. + GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr); + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = + glb_policy->lb_calld->send_message_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + glb_lb_call_data_ref(glb_policy->lb_calld, + "lb_on_sent_initial_request_locked"); + call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv initial metadata. + op = ops; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = - &glb_policy->lb_initial_metadata_recv; + &glb_policy->lb_calld->lb_initial_metadata_recv; op->flags = 0; op->reserved = nullptr; op++; - GPR_ASSERT(glb_policy->lb_request_payload != nullptr); - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = glb_policy->lb_request_payload; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = + &glb_policy->lb_calld->recv_message_payload; op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_sent_initial_request); + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); - + // Op: recv server status. op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_call_status; + &glb_policy->lb_calld->lb_trailing_metadata_recv; + op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status; op->data.recv_status_on_client.status_details = - &glb_policy->lb_call_status_details; + &glb_policy->lb_calld->lb_call_status_details; op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_server_status_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked"); + // This callback signals the end of the LB call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_server_status_received); - GPR_ASSERT(GRPC_CALL_OK == call_error); - - op = ops; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &glb_policy->lb_response_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - /* take a ref to be unref'd/reused in lb_on_response_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked"); - call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_response_received); + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_server_status_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->initial_request_sent = true; + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + lb_calld->send_message_payload = nullptr; // If we attempted to send a client load report before the initial request was - // sent, send the load report now. - if (glb_policy->client_load_report_payload != nullptr) { - do_send_client_load_report_locked(glb_policy); + // sent (and this lb_calld is still in use), send the load report now. + if (lb_calld->client_load_report_is_due && + lb_calld == lb_calld->glb_policy->lb_calld) { + send_client_load_report_locked(lb_calld); + lb_calld->client_load_report_is_due = false; } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + // Empty payload means the LB call was cancelled. + if (lb_calld != glb_policy->lb_calld || + lb_calld->recv_message_payload == nullptr) { + glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked"); + return; + } grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op* op = ops; - if (glb_policy->lb_response_payload != nullptr) { - glb_policy->lb_call_backoff->Reset(); - /* Received data from the LB server. Look inside - * glb_policy->lb_response_payload, for a serverlist. */ - grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); - grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - - grpc_grpclb_initial_response* response = nullptr; - if (!glb_policy->seen_initial_response && - (response = grpc_grpclb_initial_response_parse(response_slice)) != - nullptr) { - if (response->has_client_stats_report_interval) { - glb_policy->client_stats_report_interval = GPR_MAX( - GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( - &response->client_stats_report_interval)); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRIdPTR " milliseconds", - glb_policy, glb_policy->client_stats_report_interval); - } - /* take a ref to be unref'd in send_client_load_report_locked() */ - glb_policy->client_load_report_timer_callback_pending = true; - GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report"); - schedule_next_client_load_report(glb_policy); - } else if (grpc_lb_glb_trace.enabled()) { + glb_policy->lb_call_backoff->Reset(); + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload); + lb_calld->recv_message_payload = nullptr; + grpc_grpclb_initial_response* initial_response; + grpc_grpclb_serverlist* serverlist; + if (!lb_calld->seen_initial_response && + (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != + nullptr) { + // Have NOT seen initial response, look for initial response. + if (initial_response->has_client_stats_report_interval) { + lb_calld->client_stats_report_interval = GPR_MAX( + GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( + &initial_response->client_stats_report_interval)); + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - glb_policy); + "[grpclb %p] Received initial LB response message; " + "client load reporting interval = %" PRIdPTR " milliseconds", + glb_policy, lb_calld->client_stats_report_interval); } - grpc_grpclb_initial_response_destroy(response); - glb_policy->seen_initial_response = true; - } else { - grpc_grpclb_serverlist* serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != nullptr) { - GPR_ASSERT(glb_policy->lb_call != nullptr); + } else if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Received initial LB response message; client load " + "reporting NOT enabled", + glb_policy); + } + grpc_grpclb_initial_response_destroy(initial_response); + lb_calld->seen_initial_response = true; + } else if ((serverlist = grpc_grpclb_response_parse_serverlist( + response_slice)) != nullptr) { + // Have seen initial response, look for serverlist. + GPR_ASSERT(lb_calld->lb_call != nullptr); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Serverlist with %" PRIuPTR " servers received", + glb_policy, serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char* ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", + glb_policy, i, ipport); + gpr_free(ipport); + } + } + /* update serverlist */ + if (serverlist->num_servers > 0) { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval > 0 && + lb_calld->client_stats == nullptr) { + lb_calld->client_stats = grpc_grpclb_client_stats_create(); + glb_lb_call_data_ref(lb_calld, "client_load_report"); + schedule_next_client_load_report(lb_calld); + } + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - glb_policy, serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char* ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - glb_policy, i, ipport); - gpr_free(ipport); - } + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", + glb_policy); } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, - serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - glb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = nullptr; - if (glb_policy->fallback_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_fallback_timer); - } - } - /* and update the copy in the glb_lb_policy instance. This - * serverlist instance will be destroyed either upon the next - * update or in glb_destroy() */ - glb_policy->serverlist = serverlist; - glb_policy->serverlist_index = 0; - rr_handover_locked(glb_policy); - } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != nullptr) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } else { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received empty server list, ignoring.", - glb_policy); + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = nullptr; + if (glb_policy->fallback_timer_callback_pending) { + grpc_timer_cancel(&glb_policy->lb_fallback_timer); + glb_policy->fallback_timer_callback_pending = false; } - grpc_grpclb_destroy_serverlist(serverlist); } - } else { /* serverlist == nullptr */ - gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - glb_policy, - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + glb_policy->serverlist_index = 0; + rr_handover_locked(glb_policy); + } + } else { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", + glb_policy); } + grpc_grpclb_destroy_serverlist(serverlist); } - grpc_slice_unref_internal(response_slice); - if (!glb_policy->shutting_down) { - /* keep listening for serverlist updates */ - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &glb_policy->lb_response_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - /* reuse the "lb_on_response_received_locked" ref taken in - * query_for_backends_locked() */ - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_response_received); /* loop */ - GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + // No valid initial response or serverlist found. + gpr_log(GPR_ERROR, + "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", + glb_policy, + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + } + grpc_slice_unref_internal(response_slice); + if (!glb_policy->shutting_down) { + // Keep listening for serverlist updates. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &lb_calld->recv_message_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + // Reuse the "lb_on_response_received_locked" ref taken in + // query_for_backends_locked(). + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call, ops, (size_t)(op - ops), + &lb_calld->lb_on_response_received); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + glb_lb_call_data_unref(lb_calld, + "lb_on_response_received_locked+glb_shutdown"); + } +} + +static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + GPR_ASSERT(lb_calld->lb_call != nullptr); + if (grpc_lb_glb_trace.enabled()) { + char* status_details = + grpc_slice_to_c_string(lb_calld->lb_call_status_details); + gpr_log(GPR_INFO, + "[grpclb %p] Status from LB server received. Status = %d, details " + "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", + lb_calld->glb_policy, lb_calld->lb_call_status, status_details, + lb_calld, lb_calld->lb_call, grpc_error_string(error)); + gpr_free(status_details); + } + // If this lb_calld is still in use, this call ended because of a failure so + // we want to retry connecting. Otherwise, we have deliberately ended this + // call and no further action is required. + if (lb_calld == glb_policy->lb_calld) { + glb_policy->lb_calld = nullptr; + if (lb_calld->client_load_report_timer_callback_pending) { + grpc_timer_cancel(&lb_calld->client_load_report_timer); + } + GPR_ASSERT(!glb_policy->shutting_down); + if (lb_calld->seen_initial_response) { + // If we lose connection to the LB server, reset the backoff and restart + // the LB call immediately. + glb_policy->lb_call_backoff->Reset(); + query_for_backends_locked(glb_policy); } else { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_shutdown"); + // If this LB call fails establishing any connection to the LB server, + // retry later. + start_lb_call_retry_timer_locked(glb_policy); } - } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received_locked" ref taken in - * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_empty_payload"); } + glb_lb_call_data_unref(lb_calld, "lb_call_ended"); } static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { @@ -1597,29 +1656,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } -static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - GPR_ASSERT(glb_policy->lb_call != nullptr); - if (grpc_lb_glb_trace.enabled()) { - char* status_details = - grpc_slice_to_c_string(glb_policy->lb_call_status_details); - gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, Details " - "= '%s', (call: %p), error '%s'", - glb_policy, glb_policy->lb_call_status, status_details, - glb_policy->lb_call, grpc_error_string(error)); - gpr_free(status_details); - } - /* We need to perform cleanups no matter what. */ - lb_call_destroy_locked(glb_policy); - // If the load report timer is still pending, we wait for it to be - // called before restarting the call. Otherwise, we restart the call - // here. - if (!glb_policy->client_load_report_timer_callback_pending) { - maybe_restart_lb_call(glb_policy); - } -} - static void fallback_update_locked(glb_lb_policy* glb_policy, const grpc_lb_addresses* addresses) { GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); @@ -1701,7 +1737,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, switch (glb_policy->lb_channel_connectivity) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { - /* resub. */ + // Keep watching the LB channel. grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(glb_policy->lb_channel)); @@ -1714,29 +1750,26 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, &glb_policy->lb_channel_on_connectivity_changed, nullptr); break; } + // The LB channel may be IDLE because it's shut down before the update. + // Restart the LB call to kick the LB channel into gear. case GRPC_CHANNEL_IDLE: - // lb channel inactive (probably shutdown prior to update). Restart lb - // call to kick the lb channel into gear. - /* fallthrough */ case GRPC_CHANNEL_READY: - if (glb_policy->lb_call != nullptr) { - glb_policy->updating_lb_call = true; - grpc_call_cancel(glb_policy->lb_call, nullptr); - // lb_on_server_status_received() will pick up the cancel and reinit - // lb_call. - } else if (glb_policy->started_picking) { + if (glb_policy->lb_calld != nullptr) { + lb_call_data_shutdown(glb_policy); + } + if (glb_policy->started_picking) { if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } - start_picking_locked(glb_policy); + glb_policy->lb_call_backoff->Reset(); + query_for_backends_locked(glb_policy); } - /* fallthrough */ + // Fall through. case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; GRPC_LB_POLICY_UNREF(&glb_policy->base, "watch_lb_channel_connectivity_cb_shutdown"); - break; } } @@ -1851,6 +1884,14 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + // Init LB call backoff option. + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->lb_call_backoff.Init(backoff_options); return &glb_policy->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e217a0b0c0..24c381a46d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -34,7 +34,7 @@ #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index f4e345def6..3377605263 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,7 +22,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 1efdc26d56..6ba5f932f0 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -34,9 +34,9 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 66a03c5a85..62f03d52c0 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -29,9 +29,9 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index bb43651d0c..cad8578511 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -37,8 +37,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr++/debug_location.h" -#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index f2a5c1e273..b7593ec911 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -21,9 +21,9 @@ #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/gpr++/ref_counted.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/gpr/arena.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e067b696a1..530ab17bc7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1886,7 +1886,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, grpc_chttp2_maybe_complete_recv_message(t, s); if (s->recv_trailing_metadata_finished != nullptr && s->read_closed && s->write_closed) { - if (s->seen_error) { + if (s->seen_error || !t->is_client) { grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 7e83ea05cd..2ee1345260 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -24,8 +24,8 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/http2_settings.h" -#include "src/core/lib/gpr++/abstract.h" -#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/pid_controller.h" diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index de901f0ce8..6b6c0b28e2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -35,7 +35,7 @@ #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/compression/stream_compression.h" -#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" diff --git a/src/core/lib/gpr/alloc.cc b/src/core/lib/gpr/alloc.cc index 518bdb99f7..000b7dcb25 100644 --- a/src/core/lib/gpr/alloc.cc +++ b/src/core/lib/gpr/alloc.cc @@ -90,8 +90,8 @@ void* gpr_realloc(void* p, size_t size) { return p; } -void* gpr_malloc_aligned(size_t size, size_t alignment_log) { - size_t alignment = ((size_t)1) << alignment_log; +void* gpr_malloc_aligned(size_t size, size_t alignment) { + GPR_ASSERT(((alignment - 1) & alignment) == 0); // Must be power of 2. size_t extra = alignment - 1 + sizeof(void*); void* p = gpr_malloc(size + extra); void** ret = (void**)(((uintptr_t)p + extra) & ~(alignment - 1)); diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc index 177c176732..687592a140 100644 --- a/src/core/lib/gpr/arena.cc +++ b/src/core/lib/gpr/arena.cc @@ -17,11 +17,19 @@ */ #include "src/core/lib/gpr/arena.h" + +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +// TODO(roth): We currently assume that all callers need alignment of 16 +// bytes, which may be wrong in some cases. As part of converting the +// arena API to C++, we should consider replacing gpr_arena_alloc() with a +// template that takes the type of the value being allocated, which +// would allow us to use the alignment actually needed by the caller. #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) @@ -36,9 +44,16 @@ struct gpr_arena { zone initial_zone; }; +static void* zalloc_aligned(size_t size) { + void* ptr = gpr_malloc_aligned(size, GPR_MAX_ALIGNMENT); + memset(ptr, 0, size); + return ptr; +} + gpr_arena* gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); - gpr_arena* a = (gpr_arena*)gpr_zalloc(sizeof(gpr_arena) + initial_size); + gpr_arena* a = (gpr_arena*)zalloc_aligned( + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size); a->initial_zone.size_end = initial_size; return a; } @@ -46,10 +61,10 @@ gpr_arena* gpr_arena_create(size_t initial_size) { size_t gpr_arena_destroy(gpr_arena* arena) { gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far); zone* z = (zone*)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm); - gpr_free(arena); + gpr_free_aligned(arena); while (z) { zone* next_z = (zone*)gpr_atm_no_barrier_load(&z->next_atm); - gpr_free(z); + gpr_free_aligned(z); z = next_z; } return (size_t)size; @@ -64,11 +79,12 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) { zone* next_z = (zone*)gpr_atm_acq_load(&z->next_atm); if (next_z == nullptr) { size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far); - next_z = (zone*)gpr_zalloc(sizeof(zone) + next_z_size); + next_z = (zone*)zalloc_aligned(ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) + + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { - gpr_free(next_z); + gpr_free_aligned(next_z); next_z = (zone*)gpr_atm_acq_load(&z->next_atm); } } @@ -79,5 +95,8 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) { } GPR_ASSERT(start >= z->size_begin); GPR_ASSERT(start + size <= z->size_end); - return ((char*)(z + 1)) + start - z->size_begin; + char* ptr = (z == &arena->initial_zone) + ? (char*)arena + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + : (char*)z + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)); + return ptr + start - z->size_begin; } diff --git a/src/core/lib/gpr/fork.cc b/src/core/lib/gpr/fork.cc index c47e686378..92023f4350 100644 --- a/src/core/lib/gpr/fork.cc +++ b/src/core/lib/gpr/fork.cc @@ -38,18 +38,32 @@ void grpc_fork_support_init() { fork_support_enabled = 1; #else fork_support_enabled = 0; +#endif + bool env_var_set = false; char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); if (env != nullptr) { static const char* truthy[] = {"yes", "Yes", "YES", "true", "True", "TRUE", "1"}; + static const char* falsey[] = {"no", "No", "NO", "false", + "False", "FALSE", "0"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == strcmp(env, truthy[i])) { fork_support_enabled = 1; + env_var_set = true; + break; + } + } + if (!env_var_set) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { + if (0 == strcmp(env, falsey[i])) { + fork_support_enabled = 0; + env_var_set = true; + break; + } } } gpr_free(env); } -#endif if (override_fork_support_enabled != -1) { fork_support_enabled = override_fork_support_enabled; } diff --git a/src/core/lib/gpr++/README.md b/src/core/lib/gprpp/README.md index eab018bb31..eab018bb31 100644 --- a/src/core/lib/gpr++/README.md +++ b/src/core/lib/gprpp/README.md diff --git a/src/core/lib/gpr++/abstract.h b/src/core/lib/gprpp/abstract.h index 51d7572302..cc96edc49b 100644 --- a/src/core/lib/gpr++/abstract.h +++ b/src/core/lib/gprpp/abstract.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_ABSTRACT_H -#define GRPC_CORE_LIB_GPRXX_ABSTRACT_H +#ifndef GRPC_CORE_LIB_GPRPP_ABSTRACT_H +#define GRPC_CORE_LIB_GPRPP_ABSTRACT_H // This is needed to support abstract base classes in the c core. Since gRPC // doesn't have a c++ runtime, it will hit a linker error on delete unless @@ -31,4 +31,4 @@ #define GRPC_ABSTRACT \ { GPR_ASSERT(false); } -#endif /* GRPC_CORE_LIB_GPRXX_ABSTRACT_H */ +#endif /* GRPC_CORE_LIB_GPRPP_ABSTRACT_H */ diff --git a/src/core/lib/gpr++/atomic.h b/src/core/lib/gprpp/atomic.h index d68ccea864..8b08fc4e9c 100644 --- a/src/core/lib/gpr++/atomic.h +++ b/src/core/lib/gprpp/atomic.h @@ -16,15 +16,15 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_H -#define GRPC_CORE_LIB_GPRXX_ATOMIC_H +#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_H +#define GRPC_CORE_LIB_GPRPP_ATOMIC_H #include <grpc/support/port_platform.h> #ifdef GPR_HAS_CXX11_ATOMIC -#include "src/core/lib/gpr++/atomic_with_std.h" +#include "src/core/lib/gprpp/atomic_with_std.h" #else -#include "src/core/lib/gpr++/atomic_with_atm.h" +#include "src/core/lib/gprpp/atomic_with_atm.h" #endif -#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_H */ +#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_H */ diff --git a/src/core/lib/gpr++/atomic_with_atm.h b/src/core/lib/gprpp/atomic_with_atm.h index 09490e8148..6abf0bc38d 100644 --- a/src/core/lib/gpr++/atomic_with_atm.h +++ b/src/core/lib/gprpp/atomic_with_atm.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H -#define GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H +#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H +#define GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H #include <grpc/support/atm.h> @@ -52,4 +52,4 @@ class atomic<bool> { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H */ +#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H */ diff --git a/src/core/lib/gpr++/atomic_with_std.h b/src/core/lib/gprpp/atomic_with_std.h index b6ff90655e..83322b81c1 100644 --- a/src/core/lib/gpr++/atomic_with_std.h +++ b/src/core/lib/gprpp/atomic_with_std.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H -#define GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H +#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H +#define GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H #include <atomic> @@ -30,4 +30,4 @@ typedef std::memory_order memory_order; } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H */ +#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H */ diff --git a/src/core/lib/gpr++/debug_location.h b/src/core/lib/gprpp/debug_location.h index 5a8665ce19..287761beaf 100644 --- a/src/core/lib/gpr++/debug_location.h +++ b/src/core/lib/gprpp/debug_location.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H -#define GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H +#ifndef GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H +#define GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H namespace grpc_core { @@ -49,4 +49,4 @@ class DebugLocation { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H */ +#endif /* GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H */ diff --git a/src/core/lib/gpr++/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index 17ee9e16bb..b78f85b893 100644 --- a/src/core/lib/gpr++/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -16,12 +16,12 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H -#define GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H +#ifndef GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H +#define GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H #include <cassert> -#include "src/core/lib/gpr++/memory.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { @@ -109,4 +109,4 @@ class InlinedVector { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H */ +#endif /* GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H */ diff --git a/src/core/lib/gpr++/manual_constructor.h b/src/core/lib/gprpp/manual_constructor.h index a3f006da34..cee38abc1b 100644 --- a/src/core/lib/gpr++/manual_constructor.h +++ b/src/core/lib/gprpp/manual_constructor.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_MANUAL_CONSTRUCTOR_H -#define GRPC_CORE_LIB_GPRXX_MANUAL_CONSTRUCTOR_H +#ifndef GRPC_CORE_LIB_GPRPP_MANUAL_CONSTRUCTOR_H +#define GRPC_CORE_LIB_GPRPP_MANUAL_CONSTRUCTOR_H // manually construct a region of memory with some type diff --git a/src/core/lib/gpr++/memory.h b/src/core/lib/gprpp/memory.h index 75ed3d6cea..17f42f5983 100644 --- a/src/core/lib/gpr++/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_MEMORY_H -#define GRPC_CORE_LIB_GPRXX_MEMORY_H +#ifndef GRPC_CORE_LIB_GPRPP_MEMORY_H +#define GRPC_CORE_LIB_GPRPP_MEMORY_H #include <grpc/support/alloc.h> @@ -97,4 +97,4 @@ class Allocator { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_MEMORY_H */ +#endif /* GRPC_CORE_LIB_GPRPP_MEMORY_H */ diff --git a/src/core/lib/gpr++/orphanable.h b/src/core/lib/gprpp/orphanable.h index f106e74dde..50199730c9 100644 --- a/src/core/lib/gpr++/orphanable.h +++ b/src/core/lib/gprpp/orphanable.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_ORPHANABLE_H -#define GRPC_CORE_LIB_GPRXX_ORPHANABLE_H +#ifndef GRPC_CORE_LIB_GPRPP_ORPHANABLE_H +#define GRPC_CORE_LIB_GPRPP_ORPHANABLE_H #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -25,9 +25,9 @@ #include <memory> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/abstract.h" -#include "src/core/lib/gpr++/debug_location.h" -#include "src/core/lib/gpr++/memory.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { @@ -168,4 +168,4 @@ class InternallyRefCountedWithTracing : public Orphanable { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_ORPHANABLE_H */ +#endif /* GRPC_CORE_LIB_GPRPP_ORPHANABLE_H */ diff --git a/src/core/lib/gpr++/ref_counted.h b/src/core/lib/gprpp/ref_counted.h index c2ae76c0ae..c68118a71a 100644 --- a/src/core/lib/gpr++/ref_counted.h +++ b/src/core/lib/gprpp/ref_counted.h @@ -16,16 +16,16 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_REF_COUNTED_H -#define GRPC_CORE_LIB_GPRXX_REF_COUNTED_H +#ifndef GRPC_CORE_LIB_GPRPP_REF_COUNTED_H +#define GRPC_CORE_LIB_GPRPP_REF_COUNTED_H #include <grpc/support/log.h> #include <grpc/support/sync.h> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/abstract.h" -#include "src/core/lib/gpr++/debug_location.h" -#include "src/core/lib/gpr++/memory.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { @@ -130,4 +130,4 @@ class RefCountedWithTracing { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_REF_COUNTED_H */ +#endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_H */ diff --git a/src/core/lib/gpr++/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h index 862294d1aa..dda0f00d79 100644 --- a/src/core/lib/gpr++/ref_counted_ptr.h +++ b/src/core/lib/gprpp/ref_counted_ptr.h @@ -16,12 +16,12 @@ * */ -#ifndef GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H -#define GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H +#ifndef GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H +#define GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H #include <utility> -#include "src/core/lib/gpr++/memory.h" +#include "src/core/lib/gprpp/memory.h" namespace grpc_core { @@ -96,4 +96,4 @@ inline RefCountedPtr<T> MakeRefCounted(Args&&... args) { } // namespace grpc_core -#endif /* GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H */ +#endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 1cb0150f45..42d7cdd348 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -43,8 +43,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index b81c00ca7a..416e8384b4 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -41,8 +41,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/spinlock.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 11c64d080c..1518348992 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -43,7 +43,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 27a2a4eeb6..a1f1cf1107 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -23,7 +23,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/gpr++/atomic.h" +#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/string.h" diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index 7d36c6c9e1..19498a6df7 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -23,4 +23,4 @@ const char* grpc_version_string(void) { return "5.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "glossy"; } +const char* grpc_g_stands_for(void) { return "glamorous"; } diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc index 229f7efd37..b396a621bd 100644 --- a/src/core/tsi/ssl_transport_security.cc +++ b/src/core/tsi/ssl_transport_security.cc @@ -96,8 +96,7 @@ struct tsi_ssl_server_handshaker_factory { typedef struct { tsi_handshaker base; SSL* ssl; - BIO* into_ssl; - BIO* from_ssl; + BIO* network_io; tsi_result result; tsi_ssl_handshaker_factory* factory_ref; } tsi_ssl_handshaker; @@ -105,8 +104,7 @@ typedef struct { typedef struct { tsi_frame_protector base; SSL* ssl; - BIO* into_ssl; - BIO* from_ssl; + BIO* network_io; unsigned char* buffer; size_t buffer_size; size_t buffer_offset; @@ -730,11 +728,11 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self, tsi_result result = TSI_OK; /* First see if we have some pending data in the SSL BIO. */ - int pending_in_ssl = (int)BIO_pending(impl->from_ssl); + int pending_in_ssl = (int)BIO_pending(impl->network_io); if (pending_in_ssl > 0) { *unprotected_bytes_size = 0; GPR_ASSERT(*protected_output_frames_size <= INT_MAX); - read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, + read_from_ssl = BIO_read(impl->network_io, protected_output_frames, (int)*protected_output_frames_size); if (read_from_ssl < 0) { gpr_log(GPR_ERROR, @@ -762,7 +760,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self, if (result != TSI_OK) return result; GPR_ASSERT(*protected_output_frames_size <= INT_MAX); - read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, + read_from_ssl = BIO_read(impl->network_io, protected_output_frames, (int)*protected_output_frames_size); if (read_from_ssl < 0) { gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write."); @@ -788,20 +786,20 @@ static tsi_result ssl_protector_protect_flush( impl->buffer_offset = 0; } - pending = (int)BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->network_io); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; if (*still_pending_size == 0) return TSI_OK; GPR_ASSERT(*protected_output_frames_size <= INT_MAX); - read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames, + read_from_ssl = BIO_read(impl->network_io, protected_output_frames, (int)*protected_output_frames_size); if (read_from_ssl <= 0) { gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write."); return TSI_INTERNAL_ERROR; } *protected_output_frames_size = (size_t)read_from_ssl; - pending = (int)BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->network_io); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; return TSI_OK; @@ -831,7 +829,7 @@ static tsi_result ssl_protector_unprotect( /* Then, try to write some data to ssl. */ GPR_ASSERT(*protected_frames_bytes_size <= INT_MAX); - written_into_ssl = BIO_write(impl->into_ssl, protected_frames_bytes, + written_into_ssl = BIO_write(impl->network_io, protected_frames_bytes, (int)*protected_frames_bytes_size); if (written_into_ssl < 0) { gpr_log(GPR_ERROR, "Sending protected frame to ssl failed with %d", @@ -853,6 +851,7 @@ static void ssl_protector_destroy(tsi_frame_protector* self) { tsi_ssl_frame_protector* impl = (tsi_ssl_frame_protector*)self; if (impl->buffer != nullptr) gpr_free(impl->buffer); if (impl->ssl != nullptr) SSL_free(impl->ssl); + if (impl->network_io != nullptr) BIO_free(impl->network_io); gpr_free(self); } @@ -916,10 +915,10 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self, return TSI_INVALID_ARGUMENT; } GPR_ASSERT(*bytes_size <= INT_MAX); - bytes_read_from_ssl = BIO_read(impl->from_ssl, bytes, (int)*bytes_size); + bytes_read_from_ssl = BIO_read(impl->network_io, bytes, (int)*bytes_size); if (bytes_read_from_ssl < 0) { *bytes_size = 0; - if (!BIO_should_retry(impl->from_ssl)) { + if (!BIO_should_retry(impl->network_io)) { impl->result = TSI_INTERNAL_ERROR; return impl->result; } else { @@ -927,7 +926,7 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self, } } *bytes_size = (size_t)bytes_read_from_ssl; - return BIO_pending(impl->from_ssl) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA; + return BIO_pending(impl->network_io) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA; } static tsi_result ssl_handshaker_get_result(tsi_handshaker* self) { @@ -948,7 +947,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer( } GPR_ASSERT(*bytes_size <= INT_MAX); bytes_written_into_ssl_size = - BIO_write(impl->into_ssl, bytes, (int)*bytes_size); + BIO_write(impl->network_io, bytes, (int)*bytes_size); if (bytes_written_into_ssl_size < 0) { gpr_log(GPR_ERROR, "Could not write to memory BIO."); impl->result = TSI_INTERNAL_ERROR; @@ -965,7 +964,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer( ssl_result = SSL_get_error(impl->ssl, ssl_result); switch (ssl_result) { case SSL_ERROR_WANT_READ: - if (BIO_pending(impl->from_ssl) == 0) { + if (BIO_pending(impl->network_io) == 0) { /* We need more data. */ return TSI_INCOMPLETE_DATA; } else { @@ -1058,12 +1057,13 @@ static tsi_result ssl_handshaker_create_frame_protector( return TSI_INTERNAL_ERROR; } - /* Transfer ownership of ssl to the frame protector. It is OK as the caller - * cannot call anything else but destroy on the handshaker after this call. */ + /* Transfer ownership of ssl and network_io to the frame protector. It is OK + * as the caller cannot call anything else but destroy on the handshaker + * after this call. */ protector_impl->ssl = impl->ssl; impl->ssl = nullptr; - protector_impl->into_ssl = impl->into_ssl; - protector_impl->from_ssl = impl->from_ssl; + protector_impl->network_io = impl->network_io; + impl->network_io = nullptr; protector_impl->base.vtable = &frame_protector_vtable; *protector = &protector_impl->base; @@ -1072,7 +1072,8 @@ static tsi_result ssl_handshaker_create_frame_protector( static void ssl_handshaker_destroy(tsi_handshaker* self) { tsi_ssl_handshaker* impl = (tsi_ssl_handshaker*)self; - SSL_free(impl->ssl); /* The BIO objects are owned by ssl */ + SSL_free(impl->ssl); + BIO_free(impl->network_io); tsi_ssl_handshaker_factory_unref(impl->factory_ref); gpr_free(impl); } @@ -1094,8 +1095,8 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, tsi_ssl_handshaker_factory* factory, tsi_handshaker** handshaker) { SSL* ssl = SSL_new(ctx); - BIO* into_ssl = nullptr; - BIO* from_ssl = nullptr; + BIO* network_io = nullptr; + BIO* ssl_io = nullptr; tsi_ssl_handshaker* impl = nullptr; *handshaker = nullptr; if (ctx == nullptr) { @@ -1107,16 +1108,12 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, } SSL_set_info_callback(ssl, ssl_info_callback); - into_ssl = BIO_new(BIO_s_mem()); - from_ssl = BIO_new(BIO_s_mem()); - if (into_ssl == nullptr || from_ssl == nullptr) { - gpr_log(GPR_ERROR, "BIO_new failed."); + if (!BIO_new_bio_pair(&network_io, 0, &ssl_io, 0)) { + gpr_log(GPR_ERROR, "BIO_new_bio_pair failed."); SSL_free(ssl); - if (into_ssl != nullptr) BIO_free(into_ssl); - if (from_ssl != nullptr) BIO_free(into_ssl); return TSI_OUT_OF_RESOURCES; } - SSL_set_bio(ssl, into_ssl, from_ssl); + SSL_set_bio(ssl, ssl_io, ssl_io); if (is_client) { int ssl_result; SSL_set_connect_state(ssl); @@ -1125,6 +1122,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, gpr_log(GPR_ERROR, "Invalid server name indication %s.", server_name_indication); SSL_free(ssl); + BIO_free(network_io); return TSI_INTERNAL_ERROR; } } @@ -1135,6 +1133,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, "Unexpected error received from first SSL_do_handshake call: %s", ssl_error_string(ssl_result)); SSL_free(ssl); + BIO_free(network_io); return TSI_INTERNAL_ERROR; } } else { @@ -1143,8 +1142,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client, impl = (tsi_ssl_handshaker*)gpr_zalloc(sizeof(*impl)); impl->ssl = ssl; - impl->into_ssl = into_ssl; - impl->from_ssl = from_ssl; + impl->network_io = network_io; impl->result = TSI_HANDSHAKE_IN_PROGRESS; impl->base.vtable = &handshaker_vtable; impl->factory_ref = tsi_ssl_handshaker_factory_ref(factory); diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index 7f01a66dcf..8bc926048f 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpc++/grpc++.h> namespace grpc { -grpc::string Version() { return "1.9.0-dev"; } +grpc::string Version() { return "1.10.0-dev"; } } // namespace grpc diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs index b6bb0a9eae..9c6f8a2117 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs @@ -60,6 +60,16 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void LeaseSetsReturnAction() + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0); + var origLeased = pool.Lease(); + origLeased.ReturnAction(origLeased); + pool.Dispose(); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] public void Constructor() { Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2)); @@ -67,8 +77,14 @@ namespace Grpc.Core.Internal.Tests Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1)); } - class TestPooledObject : IDisposable + class TestPooledObject : IPooledObject<TestPooledObject> { + public Action<TestPooledObject> ReturnAction; + + public void SetReturnToPoolAction(Action<TestPooledObject> returnAction) + { + this.ReturnAction = returnAction; + } public void Dispose() { diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 6296f1863b..6bb2f6c3e5 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -299,8 +299,8 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); - batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); - requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(this.requestCallContextPool), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity); + batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); + requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 83385ad7d3..53a859d18f 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -33,22 +33,21 @@ namespace Grpc.Core.Internal /// <summary> /// grpcsharp_batch_context /// </summary> - internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback + internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle> { static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); - IObjectPool<BatchContextSafeHandle> ownedByPool; + Action<BatchContextSafeHandle> returnToPoolAction; CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { } - public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null) + public static BatchContextSafeHandle Create() { var ctx = Native.grpcsharp_batch_context_create(); - ctx.ownedByPool = ownedByPool; return ctx; } @@ -60,6 +59,12 @@ namespace Grpc.Core.Internal } } + public void SetReturnToPoolAction(Action<BatchContextSafeHandle> returnAction) + { + GrpcPreconditions.CheckState(returnToPoolAction == null); + returnToPoolAction = returnAction; + } + public void SetCompletionCallback(BatchCompletionDelegate callback, object state) { GrpcPreconditions.CheckState(completionCallbackData.Callback == null); @@ -109,10 +114,15 @@ namespace Grpc.Core.Internal public void Recycle() { - if (ownedByPool != null) + if (returnToPoolAction != null) { Native.grpcsharp_batch_context_reset(this); - ownedByPool.Return(this); + + var origReturnAction = returnToPoolAction; + // Not clearing all the references to the pool could prevent garbage collection of the pool object + // and thus cause memory leaks. + returnToPoolAction = null; + origReturnAction(this); } else { diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs index 2f030f3e02..0e1dc4d158 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -27,9 +27,10 @@ namespace Grpc.Core.Internal /// Pool of objects that combines a shared pool and a thread local pool. /// </summary> internal class DefaultObjectPool<T> : IObjectPool<T> - where T : class, IDisposable + where T : class, IPooledObject<T> { readonly object myLock = new object(); + readonly Action<T> returnAction; readonly Func<T> itemFactory; // Queue shared between threads, access needs to be synchronized. @@ -54,6 +55,7 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(sharedCapacity >= 0); GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); + this.returnAction = Return; this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); this.sharedQueue = new Queue<T>(sharedCapacity); this.sharedCapacity = sharedCapacity; @@ -74,6 +76,13 @@ namespace Grpc.Core.Internal /// </summary> public T Lease() { + var item = LeaseInternal(); + item.SetReturnToPoolAction(returnAction); + return item; + } + + private T LeaseInternal() + { var localData = threadLocalData.Value; if (localData.Queue.Count > 0) { diff --git a/src/csharp/Grpc.Core/Internal/IPooledObject.cs b/src/csharp/Grpc.Core/Internal/IPooledObject.cs new file mode 100644 index 0000000000..e20bd51dce --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IPooledObject.cs @@ -0,0 +1,34 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// An object that can be pooled in <c>IObjectPool</c>. + /// </summary> + /// <typeparam name="T"></typeparam> + internal interface IPooledObject<T> : IDisposable + { + /// <summary> + /// Set the action that will be invoked to return a leased object to the pool. + /// </summary> + void SetReturnToPoolAction(Action<T> returnAction); + } +} diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs index 59e9d9b1ab..ebc2d6d8d6 100644 --- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -20,26 +20,26 @@ using System; using System.Runtime.InteropServices; using Grpc.Core; using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> /// grpcsharp_request_call_context /// </summary> - internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback + internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<RequestCallContextSafeHandle> { static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>(); - IObjectPool<RequestCallContextSafeHandle> ownedByPool; + Action<RequestCallContextSafeHandle> returnToPoolAction; private RequestCallContextSafeHandle() { } - public static RequestCallContextSafeHandle Create(IObjectPool<RequestCallContextSafeHandle> ownedByPool = null) + public static RequestCallContextSafeHandle Create() { var ctx = Native.grpcsharp_request_call_context_create(); - ctx.ownedByPool = ownedByPool; return ctx; } @@ -51,6 +51,12 @@ namespace Grpc.Core.Internal } } + public void SetReturnToPoolAction(Action<RequestCallContextSafeHandle> returnAction) + { + GrpcPreconditions.CheckState(returnToPoolAction == null); + returnToPoolAction = returnAction; + } + public RequestCallCompletionDelegate CompletionCallback { get; set; } // Gets data of server_rpc_new completion. @@ -76,10 +82,15 @@ namespace Grpc.Core.Internal public void Recycle() { - if (ownedByPool != null) + if (returnToPoolAction != null) { Native.grpcsharp_request_call_context_reset(this); - ownedByPool.Return(this); + + var origReturnAction = returnToPoolAction; + // Not clearing all the references to the pool could prevent garbage collection of the pool object + // and thus cause memory leaks. + returnToPoolAction = null; + origReturnAction(this); } else { diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index 2d9e4ba16a..539d3a9f80 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion> + <GrpcCsharpVersion>1.10.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 9b5da1c947..f1aef46c6c 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -33,11 +33,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.9.0.0"; + public const string CurrentAssemblyFileVersion = "1.10.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.9.0-dev"; + public const string CurrentVersion = "1.10.0-dev"; } } diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index 8f89e2846a..4087d8b67a 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -13,7 +13,7 @@ @rem limitations under the License. @rem Current package versions -set VERSION=1.9.0-dev +set VERSION=1.10.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index 6a6cafe2bd..8ccc537a60 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts -nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.nuspec -Version "1.10.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.10.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 22501765f9..037ad4d9b0 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -42,7 +42,7 @@ Pod::Spec.new do |s| # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed # before them. s.name = '!ProtoCompiler-gRPCPlugin' - v = '1.9.0-dev' + v = '1.10.0-dev' s.version = v s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.' s.description = <<-DESC diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h index 69dd6266fd..5c134e3642 100644 --- a/src/objective-c/GRPCClient/private/version.h +++ b/src/objective-c/GRPCClient/private/version.h @@ -23,4 +23,4 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.10.0-dev" diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h index 6e3a073020..d8581b9779 100644 --- a/src/objective-c/tests/version.h +++ b/src/objective-c/tests/version.h @@ -23,5 +23,5 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.10.0-dev" #define GRPC_C_VERSION_STRING @"5.0.0-dev" diff --git a/src/php/composer.json b/src/php/composer.json index 43833980f9..ea21417956 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -2,7 +2,7 @@ "name": "grpc/grpc-dev", "description": "gRPC library for PHP - for Developement use only", "license": "Apache-2.0", - "version": "1.9.0", + "version": "1.10.0", "require": { "php": ">=5.5.0", "google/protobuf": "^v3.3.0" diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index c4997f720d..ff55c3cbfa 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -99,6 +99,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array 1 TSRMLS_CC); efree(str_key); efree(str_val); + PHP_GRPC_FREE_STD_ZVAL(array); return NULL; } php_grpc_add_next_index_stringl(data, str_val, @@ -127,10 +128,12 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { HashTable *inner_array_hash; zval *value; zval *inner_array; + grpc_metadata_array_init(metadata); + metadata->count = 0; + metadata->metadata = NULL; if (Z_TYPE_P(array) != IS_ARRAY) { return false; } - grpc_metadata_array_init(metadata); array_hash = Z_ARRVAL_P(array); char *key; @@ -174,6 +177,18 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { return true; } +void grpc_php_metadata_array_destroy_including_entries( + grpc_metadata_array* array) { + size_t i; + if (array->metadata) { + for (i = 0; i < array->count; i++) { + grpc_slice_unref(array->metadata[i].key); + grpc_slice_unref(array->metadata[i].value); + } + } + grpc_metadata_array_destroy(array); +} + /* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct should be destroyed at the end of the object's lifecycle */ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC) { @@ -502,8 +517,8 @@ PHP_METHOD(Call, startBatch) { } cleanup: - grpc_metadata_array_destroy(&metadata); - grpc_metadata_array_destroy(&trailing_metadata); + grpc_php_metadata_array_destroy_including_entries(&metadata); + grpc_php_metadata_array_destroy_including_entries(&trailing_metadata); grpc_metadata_array_destroy(&recv_metadata); grpc_metadata_array_destroy(&recv_trailing_metadata); grpc_slice_unref(recv_status_details); @@ -526,7 +541,9 @@ cleanup: */ PHP_METHOD(Call, getPeer) { wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); - PHP_GRPC_RETURN_STRING(grpc_call_get_peer(call->wrapped), 1); + char *peer = grpc_call_get_peer(call->wrapped); + PHP_GRPC_RETVAL_STRING(peer, 1); + gpr_free(peer); } /** diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h index 5bde5d5390..104ac301c1 100644 --- a/src/php/ext/grpc/call.h +++ b/src/php/ext/grpc/call.h @@ -69,5 +69,6 @@ void grpc_init_call(TSRMLS_D); /* Populates a grpc_metadata_array with the data in a PHP array object. Returns true on success and false on failure */ bool create_metadata_array(zval *array, grpc_metadata_array *metadata); - +void grpc_php_metadata_array_destroy_including_entries( + grpc_metadata_array* array); #endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */ diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index a395d53614..41c488a79c 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -120,6 +120,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) { fci->params, fci->param_count) == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, "createFromPlugin expects 1 callback", 1 TSRMLS_CC); + free(fci); + free(fci_cache); return; } @@ -183,15 +185,17 @@ int plugin_get_metadata( *status = GRPC_STATUS_OK; *error_details = NULL; + bool should_return = false; grpc_metadata_array metadata; if (retval == NULL || Z_TYPE_P(retval) != IS_ARRAY) { *status = GRPC_STATUS_INVALID_ARGUMENT; - return true; // Synchronous return. + should_return = true; // Synchronous return. } if (!create_metadata_array(retval, &metadata)) { *status = GRPC_STATUS_INVALID_ARGUMENT; - return true; // Synchronous return. + should_return = true; // Synchronous return. + grpc_php_metadata_array_destroy_including_entries(&metadata); } if (retval != NULL) { @@ -204,6 +208,9 @@ int plugin_get_metadata( PHP_GRPC_FREE_STD_ZVAL(retval); #endif } + if (should_return) { + return true; + } if (metadata.count > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) { *status = GRPC_STATUS_INTERNAL; diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index db59869c7f..4054723b43 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -41,6 +41,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> #include "completion_queue.h" #include "channel_credentials.h" @@ -56,22 +57,63 @@ int le_plink; /* Frees and destroys an instance of wrapped_grpc_channel */ PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel) + bool is_last_wrapper = false; + // In_persistent_list is used when the user don't close the channel. + // In this case, le in the list won't be freed. + bool in_persistent_list = true; if (p->wrapper != NULL) { gpr_mu_lock(&p->wrapper->mu); if (p->wrapper->wrapped != NULL) { - php_grpc_zend_resource *rsrc; - php_grpc_int key_len = strlen(p->wrapper->key); - // only destroy the channel here if not found in the persistent list - gpr_mu_lock(&global_persistent_list_mu); - if (!(PHP_GRPC_PERSISTENT_LIST_FIND(&EG(persistent_list), p->wrapper->key, - key_len, rsrc))) { - grpc_channel_destroy(p->wrapper->wrapped); - free(p->wrapper->target); - free(p->wrapper->args_hashstr); + if (p->wrapper->is_valid) { + php_grpc_zend_resource *rsrc; + php_grpc_int key_len = strlen(p->wrapper->key); + // only destroy the channel here if not found in the persistent list + gpr_mu_lock(&global_persistent_list_mu); + if (!(PHP_GRPC_PERSISTENT_LIST_FIND(&EG(persistent_list), p->wrapper->key, + key_len, rsrc))) { + in_persistent_list = false; + grpc_channel_destroy(p->wrapper->wrapped); + free(p->wrapper->target); + free(p->wrapper->args_hashstr); + if(p->wrapper->creds_hashstr != NULL){ + free(p->wrapper->creds_hashstr); + p->wrapper->creds_hashstr = NULL; + } + } + gpr_mu_unlock(&global_persistent_list_mu); } - gpr_mu_unlock(&global_persistent_list_mu); + } + p->wrapper->ref_count -= 1; + if (p->wrapper->ref_count == 0) { + is_last_wrapper = true; } gpr_mu_unlock(&p->wrapper->mu); + if (is_last_wrapper) { + if (in_persistent_list) { + // If ref_count==0 and the key still in the list, it means the user + // don't call channel->close().persistent list should free the + // allocation in such case, as well as related wrapped channel. + if (p->wrapper->wrapped != NULL) { + gpr_mu_lock(&p->wrapper->mu); + grpc_channel_destroy(p->wrapper->wrapped); + free(p->wrapper->target); + free(p->wrapper->args_hashstr); + if(p->wrapper->creds_hashstr != NULL){ + free(p->wrapper->creds_hashstr); + p->wrapper->creds_hashstr = NULL; + } + p->wrapper->wrapped = NULL; + php_grpc_delete_persistent_list_entry(p->wrapper->key, + strlen(p->wrapper->key) + TSRMLS_CC); + gpr_mu_unlock(&p->wrapper->mu); + } + } + gpr_mu_destroy(&p->wrapper->mu); + free(p->wrapper->key); + free(p->wrapper); + } + p->wrapper = NULL; } PHP_GRPC_FREE_WRAPPED_FUNC_END() @@ -276,9 +318,16 @@ PHP_METHOD(Channel, __construct) { channel->wrapper->key = key; channel->wrapper->target = strdup(target); channel->wrapper->args_hashstr = strdup(sha1str); + channel->wrapper->creds_hashstr = NULL; + channel->wrapper->ref_count = 1; + channel->wrapper->is_valid = true; if (creds != NULL && creds->hashstr != NULL) { - channel->wrapper->creds_hashstr = creds->hashstr; + php_grpc_int creds_hashstr_len = strlen(creds->hashstr); + char *channel_creds_hashstr = malloc(creds_hashstr_len + 1); + strcpy(channel_creds_hashstr, creds->hashstr); + channel->wrapper->creds_hashstr = channel_creds_hashstr; } + gpr_mu_init(&channel->wrapper->mu); smart_str_free(&buf); @@ -303,7 +352,17 @@ PHP_METHOD(Channel, __construct) { channel, target, args, creds, key, key_len TSRMLS_CC); } else { efree(args.args); + if (channel->wrapper->creds_hashstr != NULL){ + free(channel->wrapper->creds_hashstr); + channel->wrapper->creds_hashstr = NULL; + } + free(channel->wrapper->creds_hashstr); + free(channel->wrapper->key); + free(channel->wrapper->target); + free(channel->wrapper->args_hashstr); + free(channel->wrapper); channel->wrapper = le->channel; + channel->wrapper->ref_count += 1; } } } @@ -323,7 +382,8 @@ PHP_METHOD(Channel, getTarget) { } char *target = grpc_channel_get_target(channel->wrapper->wrapped); gpr_mu_unlock(&channel->wrapper->mu); - PHP_GRPC_RETURN_STRING(target, 1); + PHP_GRPC_RETVAL_STRING(target, 1); + gpr_free(target); } /** @@ -411,18 +471,46 @@ PHP_METHOD(Channel, watchConnectivityState) { */ PHP_METHOD(Channel, close) { wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); - gpr_mu_lock(&channel->wrapper->mu); - if (channel->wrapper->wrapped != NULL) { - grpc_channel_destroy(channel->wrapper->wrapped); - free(channel->wrapper->target); - free(channel->wrapper->args_hashstr); - channel->wrapper->wrapped = NULL; - - php_grpc_delete_persistent_list_entry(channel->wrapper->key, - strlen(channel->wrapper->key) - TSRMLS_CC); + bool is_last_wrapper = false; + if (channel->wrapper != NULL) { + // Channel_wrapper hasn't call close before. + gpr_mu_lock(&channel->wrapper->mu); + if (channel->wrapper->wrapped != NULL) { + if (channel->wrapper->is_valid) { + // Wrapped channel hasn't been destoryed by other wrapper. + grpc_channel_destroy(channel->wrapper->wrapped); + free(channel->wrapper->target); + free(channel->wrapper->args_hashstr); + if(channel->wrapper->creds_hashstr != NULL){ + free(channel->wrapper->creds_hashstr); + channel->wrapper->creds_hashstr = NULL; + } + channel->wrapper->wrapped = NULL; + channel->wrapper->is_valid = false; + + php_grpc_delete_persistent_list_entry(channel->wrapper->key, + strlen(channel->wrapper->key) + TSRMLS_CC); + } + } + channel->wrapper->ref_count -= 1; + if(channel->wrapper->ref_count == 0){ + // Mark that the wrapper can be freed because mu should be + // destroyed outside the lock. + is_last_wrapper = true; + } + gpr_mu_unlock(&channel->wrapper->mu); } - gpr_mu_unlock(&channel->wrapper->mu); + gpr_mu_lock(&global_persistent_list_mu); + if (is_last_wrapper) { + gpr_mu_destroy(&channel->wrapper->mu); + free(channel->wrapper->key); + free(channel->wrapper); + } + // Set channel->wrapper to NULL to avoid call close twice for the same + // channel. + channel->wrapper = NULL; + gpr_mu_unlock(&global_persistent_list_mu); } // Delete an entry from the persistent list @@ -437,6 +525,7 @@ void php_grpc_delete_persistent_list_entry(char *key, php_grpc_int key_len le = (channel_persistent_le_t *)rsrc->ptr; le->channel = NULL; php_grpc_zend_hash_del(&EG(persistent_list), key, key_len+1); + free(le); } gpr_mu_unlock(&global_persistent_list_mu); } diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h index 69adc4782c..86bfdea51a 100755 --- a/src/php/ext/grpc/channel.h +++ b/src/php/ext/grpc/channel.h @@ -40,6 +40,11 @@ typedef struct _grpc_channel_wrapper { char *args_hashstr; char *creds_hashstr; gpr_mu mu; + // is_valid is used to check the wrapped channel has been freed + // before to avoid double free. + bool is_valid; + // ref_count is used to let the last wrapper free related channel and key. + size_t ref_count; } grpc_channel_wrapper; /* Wrapper struct for grpc_channel that can be associated with a PHP object */ diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index d120d6e90f..624d7cc75c 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -57,8 +57,13 @@ static grpc_ssl_roots_override_result get_ssl_roots_override( /* Frees and destroys an instance of wrapped_grpc_channel_credentials */ PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel_credentials) + if (p->hashstr != NULL) { + free(p->hashstr); + p->hashstr = NULL; + } if (p->wrapped != NULL) { grpc_channel_credentials_release(p->wrapped); + p->wrapped = NULL; } PHP_GRPC_FREE_WRAPPED_FUNC_END() @@ -152,7 +157,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { } php_grpc_int hashkey_len = root_certs_length + cert_chain_length; - char *hashkey = emalloc(hashkey_len); + char *hashkey = emalloc(hashkey_len + 1); if (root_certs_length > 0) { strcpy(hashkey, pem_root_certs); } @@ -199,8 +204,13 @@ PHP_METHOD(ChannelCredentials, createComposite) { grpc_channel_credentials *creds = grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped, NULL); + // wrapped_grpc_channel_credentials object should keeps it's own + // allocation. Otherwise it conflicts free hashstr with call.c. + php_grpc_int cred1_len = strlen(cred1->hashstr); + char *cred1_hashstr = malloc(cred1_len+1); + strcpy(cred1_hashstr, cred1->hashstr); zval *creds_object = - grpc_php_wrap_channel_credentials(creds, cred1->hashstr, true + grpc_php_wrap_channel_credentials(creds, cred1_hashstr, true TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/php7_wrapper.h b/src/php/ext/grpc/php7_wrapper.h index 96091f9dad..2f4a53611c 100644 --- a/src/php/ext/grpc/php7_wrapper.h +++ b/src/php/ext/grpc/php7_wrapper.h @@ -33,6 +33,7 @@ #define php_grpc_add_next_index_stringl(data, str, len, b) \ add_next_index_stringl(data, str, len, b) +#define PHP_GRPC_RETVAL_STRING(val, dup) RETVAL_STRING(val, dup) #define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val, dup) #define PHP_GRPC_MAKE_STD_ZVAL(pzv) MAKE_STD_ZVAL(pzv) #define PHP_GRPC_FREE_STD_ZVAL(pzv) @@ -145,6 +146,7 @@ static inline int php_grpc_zend_hash_find(HashTable *ht, char *key, int len, #define php_grpc_add_next_index_stringl(data, str, len, b) \ add_next_index_stringl(data, str, len) +#define PHP_GRPC_RETVAL_STRING(val, dup) RETVAL_STRING(val) #define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val) #define PHP_GRPC_MAKE_STD_ZVAL(pzv) \ pzv = (zval *)emalloc(sizeof(zval)); diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c index 0f2c5b8114..5971babc00 100644 --- a/src/php/ext/grpc/php_grpc.c +++ b/src/php/ext/grpc/php_grpc.c @@ -253,7 +253,8 @@ PHP_MSHUTDOWN_FUNCTION(grpc) { */ PHP_MINFO_FUNCTION(grpc) { php_info_print_table_start(); - php_info_print_table_header(2, "grpc support", "enabled"); + php_info_print_table_row(2, "grpc support", "enabled"); + php_info_print_table_row(2, "grpc module version", PHP_GRPC_VERSION); php_info_print_table_end(); /* Remove comments if you have entries in php.ini diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h index 48131d72d1..408f2a4765 100644 --- a/src/php/ext/grpc/version.h +++ b/src/php/ext/grpc/version.h @@ -20,6 +20,6 @@ #ifndef VERSION_H #define VERSION_H -#define PHP_GRPC_VERSION "1.9.0dev" +#define PHP_GRPC_VERSION "1.10.0dev" #endif /* VERSION_H */ diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 2ff2e4e8a2..57592662c4 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -108,6 +108,9 @@ message ClientConfig { // Number of messages on a stream before it gets finished/restarted int32 messages_per_stream = 18; + + // Use coalescing API when possible. + bool use_coalesce_api = 19; } message ClientStatus { ClientStats stats = 1; } diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 993c49d4af..6032828c77 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.9.0.dev0""" +__version__ = """1.10.0.dev0""" diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 56a280624f..d029472c68 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -51,6 +51,30 @@ class _ClientCallDetails( pass +def _unwrap_client_call_details(call_details, default_details): + try: + method = call_details.method + except AttributeError: + method = default_details.method + + try: + timeout = call_details.timeout + except AttributeError: + timeout = default_details.timeout + + try: + metadata = call_details.metadata + except AttributeError: + metadata = default_details.metadata + + try: + credentials = call_details.credentials + except AttributeError: + credentials = default_details.credentials + + return method, timeout, metadata, credentials + + class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): def __init__(self, exception, traceback): @@ -126,15 +150,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def future(self, request, timeout=None, metadata=None, credentials=None): - def continuation(client_call_details, request): - return self._thunk(client_call_details.method).future( - request, - timeout=client_call_details.timeout, - metadata=client_call_details.metadata, - credentials=client_call_details.credentials) - client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) + + def continuation(new_details, request): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + return self._thunk(new_method).future( + request, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) + try: return self._interceptor.intercept_unary_unary( continuation, client_call_details, request) @@ -150,16 +177,18 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._interceptor = interceptor def __call__(self, request, timeout=None, metadata=None, credentials=None): + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) - def continuation(client_call_details, request): - return self._thunk(client_call_details.method)( + def continuation(new_details, request): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + return self._thunk(new_method)( request, - timeout=client_call_details.timeout, - metadata=client_call_details.metadata, - credentials=client_call_details.credentials) + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) try: return self._interceptor.intercept_unary_stream( continuation, client_call_details, request) @@ -203,17 +232,18 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): - - def continuation(client_call_details, request_iterator): - return self._thunk(client_call_details.method).future( - request_iterator, - timeout=client_call_details.timeout, - metadata=client_call_details.metadata, - credentials=client_call_details.credentials) - client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) + def continuation(new_details, request_iterator): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + return self._thunk(new_method).future( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) + try: return self._interceptor.intercept_stream_unary( continuation, client_call_details, request_iterator) @@ -233,17 +263,18 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): timeout=None, metadata=None, credentials=None): - - def continuation(client_call_details, request_iterator): - return self._thunk(client_call_details.method)( - request_iterator, - timeout=client_call_details.timeout, - metadata=client_call_details.metadata, - credentials=client_call_details.credentials) - client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) + def continuation(new_details, request_iterator): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + return self._thunk(new_method)( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) + try: return self._interceptor.intercept_stream_stream( continuation, client_call_details, request_iterator) diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 1fac57b03a..a654eb026a 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 5b7e5859bc..d3185c6972 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 0ad9621154..7203d0d321 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 0eb5fbf94d..bf9e55e10e 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index b1b4d7e0c2..2583e42016 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 555168c3bb..d701d2f571 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -583,7 +583,7 @@ extern gpr_free_type gpr_free_import; typedef void*(*gpr_realloc_type)(void* p, size_t size); extern gpr_realloc_type gpr_realloc_import; #define gpr_realloc gpr_realloc_import -typedef void*(*gpr_malloc_aligned_type)(size_t size, size_t alignment_log); +typedef void*(*gpr_malloc_aligned_type)(size_t size, size_t alignment); extern gpr_malloc_aligned_type gpr_malloc_aligned_import; #define gpr_malloc_aligned gpr_malloc_aligned_import typedef void(*gpr_free_aligned_type)(void* ptr); diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index be1412511a..9d9f2f4968 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -14,5 +14,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.9.0.dev' + VERSION = '1.10.0.dev' end diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index 48aad39e08..2682294bd2 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -14,6 +14,6 @@ module GRPC module Tools - VERSION = '1.9.0.dev' + VERSION = '1.10.0.dev' end end |