aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc64
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc867
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc4
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc2
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/lib/gpr/alloc.cc4
-rw-r--r--src/core/lib/gpr/arena.cc31
-rw-r--r--src/core/lib/gpr/fork.cc16
-rw-r--r--src/core/lib/gprpp/README.md (renamed from src/core/lib/gpr++/README.md)0
-rw-r--r--src/core/lib/gprpp/abstract.h (renamed from src/core/lib/gpr++/abstract.h)6
-rw-r--r--src/core/lib/gprpp/atomic.h (renamed from src/core/lib/gpr++/atomic.h)10
-rw-r--r--src/core/lib/gprpp/atomic_with_atm.h (renamed from src/core/lib/gpr++/atomic_with_atm.h)6
-rw-r--r--src/core/lib/gprpp/atomic_with_std.h (renamed from src/core/lib/gpr++/atomic_with_std.h)6
-rw-r--r--src/core/lib/gprpp/debug_location.h (renamed from src/core/lib/gpr++/debug_location.h)6
-rw-r--r--src/core/lib/gprpp/inlined_vector.h (renamed from src/core/lib/gpr++/inlined_vector.h)8
-rw-r--r--src/core/lib/gprpp/manual_constructor.h (renamed from src/core/lib/gpr++/manual_constructor.h)4
-rw-r--r--src/core/lib/gprpp/memory.h (renamed from src/core/lib/gpr++/memory.h)6
-rw-r--r--src/core/lib/gprpp/orphanable.h (renamed from src/core/lib/gpr++/orphanable.h)12
-rw-r--r--src/core/lib/gprpp/ref_counted.h (renamed from src/core/lib/gpr++/ref_counted.h)12
-rw-r--r--src/core/lib/gprpp/ref_counted_ptr.h (renamed from src/core/lib/gpr++/ref_counted_ptr.h)8
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/surface/lame_client.cc2
-rw-r--r--src/core/lib/surface/version.cc2
-rw-r--r--src/core/tsi/ssl_transport_security.cc62
-rw-r--r--src/cpp/common/version_cc.cc2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs18
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs22
-rw-r--r--src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/IPooledObject.cs34
-rw-r--r--src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs23
-rwxr-xr-xsrc/csharp/Grpc.Core/Version.csproj.include2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.bat2
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.sh4
-rw-r--r--src/objective-c/!ProtoCompiler-gRPCPlugin.podspec2
-rw-r--r--src/objective-c/GRPCClient/private/version.h2
-rw-r--r--src/objective-c/tests/version.h2
-rw-r--r--src/php/composer.json2
-rw-r--r--src/php/ext/grpc/call.c25
-rw-r--r--src/php/ext/grpc/call.h3
-rw-r--r--src/php/ext/grpc/call_credentials.c11
-rw-r--r--src/php/ext/grpc/channel.c135
-rwxr-xr-xsrc/php/ext/grpc/channel.h5
-rw-r--r--src/php/ext/grpc/channel_credentials.c14
-rw-r--r--src/php/ext/grpc/php7_wrapper.h2
-rw-r--r--src/php/ext/grpc/php_grpc.c3
-rw-r--r--src/php/ext/grpc/version.h2
-rw-r--r--src/proto/grpc/testing/control.proto3
-rw-r--r--src/python/grpcio/grpc/_grpcio_metadata.py2
-rw-r--r--src/python/grpcio/grpc/_interceptor.py91
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio_health_checking/grpc_version.py2
-rw-r--r--src/python/grpcio_reflection/grpc_version.py2
-rw-r--r--src/python/grpcio_testing/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/grpc_version.py2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h2
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/tools/version.rb2
68 files changed, 967 insertions, 643 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index e19726efb3..30660cb83d 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -20,7 +20,7 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#include "src/core/ext/filters/client_channel/subchannel.h"
-#include "src/core/lib/gpr++/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 1708d81e61..4596f90745 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -69,11 +69,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = (call_data*)elem->call_data;
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
- GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr);
- calld->client_stats = grpc_grpclb_client_stats_ref(
- (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value);
- // Record call started.
- grpc_grpclb_client_stats_add_call_started(calld->client_stats);
+ if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
+ calld->client_stats = grpc_grpclb_client_stats_ref(
+ (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS]
+ .value);
+ // Record call started.
+ grpc_grpclb_client_stats_add_call_started(calld->client_stats);
+ }
return GRPC_ERROR_NONE;
}
@@ -81,36 +83,40 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
- // Record call finished, optionally setting client_failed_to_send and
- // received.
- grpc_grpclb_client_stats_add_call_finished(
- !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
- calld->recv_initial_metadata_succeeded /* known_received */,
- calld->client_stats);
- // All done, so unref the stats object.
- grpc_grpclb_client_stats_unref(calld->client_stats);
+ if (calld->client_stats != nullptr) {
+ // Record call finished, optionally setting client_failed_to_send and
+ // received.
+ grpc_grpclb_client_stats_add_call_finished(
+ !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
+ calld->recv_initial_metadata_succeeded /* known_received */,
+ calld->client_stats);
+ // All done, so unref the stats object.
+ grpc_grpclb_client_stats_unref(calld->client_stats);
+ }
}
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
- // Intercept send_initial_metadata.
- if (batch->send_initial_metadata) {
- calld->original_on_complete_for_send = batch->on_complete;
- GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld,
- grpc_schedule_on_exec_ctx);
- batch->on_complete = &calld->on_complete_for_send;
- }
- // Intercept recv_initial_metadata.
- if (batch->recv_initial_metadata) {
- calld->original_recv_initial_metadata_ready =
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
- GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, calld,
- grpc_schedule_on_exec_ctx);
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->recv_initial_metadata_ready;
+ if (calld->client_stats != nullptr) {
+ // Intercept send_initial_metadata.
+ if (batch->send_initial_metadata) {
+ calld->original_on_complete_for_send = batch->on_complete;
+ GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
+ calld, grpc_schedule_on_exec_ctx);
+ batch->on_complete = &calld->on_complete_for_send;
+ }
+ // Intercept recv_initial_metadata.
+ if (batch->recv_initial_metadata) {
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, calld,
+ grpc_schedule_on_exec_ctx);
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
+ }
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 6c29cd8218..1709e5622e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -106,8 +106,8 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -169,25 +169,78 @@ struct pending_ping {
} // namespace
-struct glb_lb_policy {
- /** base policy: must be first */
+typedef struct glb_lb_call_data {
+ struct glb_lb_policy* glb_policy;
+ // TODO(juanlishen): c++ize this struct.
+ gpr_refcount refs;
+
+ /** The streaming call to the LB server. Always non-NULL. */
+ grpc_call* lb_call;
+
+ /** The initial metadata received from the LB server. */
+ grpc_metadata_array lb_initial_metadata_recv;
+
+ /** The message sent to the LB server. It's used to query for backends (the
+ * value may vary if the LB server indicates a redirect) or send client load
+ * report. */
+ grpc_byte_buffer* send_message_payload;
+ /** The callback after the initial request is sent. */
+ grpc_closure lb_on_sent_initial_request;
+
+ /** The response received from the LB server, if any. */
+ grpc_byte_buffer* recv_message_payload;
+ /** The callback to process the response received from the LB server. */
+ grpc_closure lb_on_response_received;
+ bool seen_initial_response;
+
+ /** The callback to process the status received from the LB server, which
+ * signals the end of the LB call. */
+ grpc_closure lb_on_server_status_received;
+ /** The trailing metadata from the LB server. */
+ grpc_metadata_array lb_trailing_metadata_recv;
+ /** The call status code and details. */
+ grpc_status_code lb_call_status;
+ grpc_slice lb_call_status_details;
+
+ /** The stats for client-side load reporting associated with this LB call.
+ * Created after the first serverlist is received. */
+ grpc_grpclb_client_stats* client_stats;
+ /** The interval and timer for next client load report. */
+ grpc_millis client_stats_report_interval;
+ grpc_timer client_load_report_timer;
+ bool client_load_report_timer_callback_pending;
+ bool last_client_load_report_counters_were_zero;
+ bool client_load_report_is_due;
+ /** The closure used for either the load report timer or the callback for
+ * completion of sending the load report. */
+ grpc_closure client_load_report_closure;
+} glb_lb_call_data;
+
+typedef struct glb_lb_policy {
+ /** Base policy: must be first. */
grpc_lb_policy base;
- /** who the client is trying to communicate with */
+ /** Who the client is trying to communicate with. */
const char* server_name;
+
+ /** Channel related data that will be propagated to the internal RR policy. */
grpc_client_channel_factory* cc_factory;
grpc_channel_args* args;
- /** timeout in milliseconds for the LB call. 0 means no deadline. */
- int lb_call_timeout_ms;
-
- /** timeout in milliseconds for before using fallback backend addresses.
+ /** Timeout in milliseconds for before using fallback backend addresses.
* 0 means not using fallback. */
int lb_fallback_timeout_ms;
- /** for communicating with the LB server */
+ /** The channel for communicating with the LB server. */
grpc_channel* lb_channel;
+ /** The data associated with the current LB call. It holds a ref to this LB
+ * policy. It's initialized every time we query for backends. It's reset to
+ * NULL whenever the current LB call is no longer needed (e.g., the LB policy
+ * is shutting down, or the LB call has ended). A non-NULL lb_calld always
+ * contains a non-NULL lb_call. */
+ glb_lb_call_data* lb_calld;
+
/** response generator to inject address updates into \a lb_channel */
grpc_fake_resolver_response_generator* response_generator;
@@ -225,9 +278,6 @@ struct glb_lb_policy {
bool shutting_down;
- /** are we currently updating lb_call? */
- bool updating_lb_call;
-
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@@ -243,65 +293,70 @@ struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
- /* Finished sending initial request. */
- grpc_closure lb_on_sent_initial_request;
-
- /* Status from the LB server has been received. This signals the end of the LB
- * call. */
- grpc_closure lb_on_server_status_received;
-
- /* A response from the LB server has been received. Process it */
- grpc_closure lb_on_response_received;
-
- /* LB call retry timer callback. */
- grpc_closure lb_on_call_retry;
-
- /* LB fallback timer callback. */
- grpc_closure lb_on_fallback;
-
- grpc_call* lb_call; /* streaming call to the LB server, */
-
- grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
- grpc_metadata_array
- lb_trailing_metadata_recv; /* trailing MD from LB server */
-
- /* what's being sent to the LB server. Note that its value may vary if the LB
- * server indicates a redirect. */
- grpc_byte_buffer* lb_request_payload;
-
- /* response the LB server, if any. Processed in lb_on_response_received() */
- grpc_byte_buffer* lb_response_payload;
-
- /* call status code and details, set in lb_on_server_status_received() */
- grpc_status_code lb_call_status;
- grpc_slice lb_call_status_details;
/** LB call retry backoff state */
grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
+ /** timeout in milliseconds for the LB call. 0 means no deadline. */
+ int lb_call_timeout_ms;
+
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
+ /** LB call retry timer callback */
+ grpc_closure lb_on_call_retry;
/** LB fallback timer */
grpc_timer lb_fallback_timer;
+ /** LB fallback timer callback */
+ grpc_closure lb_on_fallback;
+} glb_lb_policy;
- bool initial_request_sent;
- bool seen_initial_response;
+static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld,
+ const char* reason) {
+ gpr_ref_non_zero(&lb_calld->refs);
+ if (grpc_lb_glb_trace.enabled()) {
+ const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
+ gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)",
+ grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
+ (unsigned long)(count - 1), (unsigned long)count, reason);
+ }
+}
- /* Stats for client-side load reporting. Should be unreffed and
- * recreated whenever lb_call is replaced. */
- grpc_grpclb_client_stats* client_stats;
- /* Interval and timer for next client load report. */
- grpc_millis client_stats_report_interval;
- grpc_timer client_load_report_timer;
- bool client_load_report_timer_callback_pending;
- bool last_client_load_report_counters_were_zero;
- /* Closure used for either the load report timer or the callback for
- * completion of sending the load report. */
- grpc_closure client_load_report_closure;
- /* Client load report message payload. */
- grpc_byte_buffer* client_load_report_payload;
-};
+static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld,
+ const char* reason) {
+ const bool done = gpr_unref(&lb_calld->refs);
+ if (grpc_lb_glb_trace.enabled()) {
+ const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
+ gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)",
+ grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
+ (unsigned long)(count + 1), (unsigned long)count, reason);
+ }
+ if (done) {
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ grpc_call_unref(lb_calld->lb_call);
+ grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv);
+ grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv);
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
+ grpc_slice_unref_internal(lb_calld->lb_call_status_details);
+ if (lb_calld->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(lb_calld->client_stats);
+ }
+ GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld");
+ gpr_free(lb_calld);
+ }
+}
+
+static void lb_call_data_shutdown(glb_lb_policy* glb_policy) {
+ GPR_ASSERT(glb_policy->lb_calld != nullptr);
+ GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
+ // lb_on_server_status_received will complete the cancellation and clean up.
+ grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr);
+ if (glb_policy->lb_calld->client_load_report_timer_callback_pending) {
+ grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer);
+ }
+ glb_policy->lb_calld = nullptr;
+}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
@@ -334,11 +389,12 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) {
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(pp->client_stats != nullptr);
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
- pp->client_stats;
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
- destroy_client_stats;
+ if (pp->client_stats != nullptr) {
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats;
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ destroy_client_stats;
+ }
} else {
if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
@@ -605,9 +661,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- GPR_ASSERT(glb_policy->client_stats != nullptr);
- grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, glb_policy->client_stats);
+ if (glb_policy->lb_calld != nullptr &&
+ glb_policy->lb_calld->client_stats != nullptr) {
+ grpc_grpclb_client_stats_add_call_dropped_locked(
+ server->load_balance_token, glb_policy->lb_calld->client_stats);
+ }
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
@@ -618,7 +676,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
}
}
// Set client_stats and user_data.
- pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+ if (glb_policy->lb_calld != nullptr &&
+ glb_policy->lb_calld->client_stats != nullptr) {
+ pp->client_stats =
+ grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats);
+ }
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
@@ -872,9 +934,6 @@ static void glb_destroy(grpc_lb_policy* pol) {
GPR_ASSERT(glb_policy->pending_pings == nullptr);
gpr_free((void*)glb_policy->server_name);
grpc_channel_args_destroy(glb_policy->args);
- if (glb_policy->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(glb_policy->client_stats);
- }
grpc_connectivity_state_destroy(&glb_policy->state_tracker);
if (glb_policy->serverlist != nullptr) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
@@ -892,13 +951,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
- /* glb_policy->lb_call and this local lb_call must be consistent at this point
- * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
- * of query_for_backends_locked, which can only be invoked while
- * glb_policy->shutting_down is false. */
- if (glb_policy->lb_call != nullptr) {
- grpc_call_cancel(glb_policy->lb_call, nullptr);
- /* lb_on_server_status_received will pick up the cancel and clean up */
+ if (glb_policy->lb_calld != nullptr) {
+ lb_call_data_shutdown(glb_policy);
}
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
@@ -1048,7 +1102,6 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
-
glb_policy->started_picking = true;
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
@@ -1089,7 +1142,6 @@ static int glb_pick_locked(grpc_lb_policy* pol,
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
- GPR_ASSERT(glb_policy->client_stats != nullptr);
pick_done =
pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
}
@@ -1139,8 +1191,8 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->retry_timer_callback_pending = false;
- if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
- error == GRPC_ERROR_NONE) {
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE &&
+ glb_policy->lb_calld == nullptr) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
@@ -1149,84 +1201,55 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
-static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
- if (glb_policy->started_picking && glb_policy->updating_lb_call) {
- if (glb_policy->retry_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- }
- if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
- glb_policy->updating_lb_call = false;
- } else if (!glb_policy->shutting_down) {
- /* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
+static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) {
+ grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
+ glb_policy);
+ grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
+ if (timeout > 0) {
+ gpr_log(GPR_DEBUG,
+ "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ glb_policy, timeout);
+ } else {
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
glb_policy);
- grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
- if (timeout > 0) {
- gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.",
- glb_policy, timeout);
- } else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.",
- glb_policy);
- }
}
- GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
- lb_call_on_retry_timer_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_callback_pending = true;
- grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
- &glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ glb_policy->retry_timer_callback_pending = true;
+ grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
+ &glb_policy->lb_on_call_retry);
}
-static void send_client_load_report_locked(void* arg, grpc_error* error);
+static void maybe_send_client_load_report_locked(void* arg, grpc_error* error);
-static void schedule_next_client_load_report(glb_lb_policy* glb_policy) {
+static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) {
const grpc_millis next_client_load_report_time =
- grpc_core::ExecCtx::Get()->Now() +
- glb_policy->client_stats_report_interval;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- send_client_load_report_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_timer_init(&glb_policy->client_load_report_timer,
+ grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval;
+ GRPC_CLOSURE_INIT(
+ &lb_calld->client_load_report_closure,
+ maybe_send_client_load_report_locked, lb_calld,
+ grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner));
+ grpc_timer_init(&lb_calld->client_load_report_timer,
next_client_load_report_time,
- &glb_policy->client_load_report_closure);
+ &lb_calld->client_load_report_closure);
+ lb_calld->client_load_report_timer_callback_pending = true;
}
static void client_load_report_done_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
- glb_policy->client_load_report_payload = nullptr;
- if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_callback_pending = false;
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
- if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(glb_policy);
- }
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ lb_calld->send_message_payload = nullptr;
+ if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
+ glb_lb_call_data_unref(lb_calld, "client_load_report");
return;
}
- schedule_next_client_load_report(glb_policy);
-}
-
-static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
+ schedule_next_client_load_report(lb_calld);
}
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
@@ -1241,341 +1264,377 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
(drop_entries == nullptr || drop_entries->num_entries == 0);
}
-static void send_client_load_report_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_callback_pending = false;
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
- if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(glb_policy);
- }
- return;
- }
+static void send_client_load_report_locked(glb_lb_call_data* lb_calld) {
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
// Construct message payload.
- GPR_ASSERT(glb_policy->client_load_report_payload == nullptr);
+ GPR_ASSERT(lb_calld->send_message_payload == nullptr);
grpc_grpclb_request* request =
- grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
- if (glb_policy->last_client_load_report_counters_were_zero) {
+ if (lb_calld->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
- schedule_next_client_load_report(glb_policy);
+ schedule_next_client_load_report(lb_calld);
return;
}
- glb_policy->last_client_load_report_counters_were_zero = true;
+ lb_calld->last_client_load_report_counters_were_zero = true;
} else {
- glb_policy->last_client_load_report_counters_were_zero = false;
+ lb_calld->last_client_load_report_counters_were_zero = false;
}
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- glb_policy->client_load_report_payload =
+ lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
+ // Send the report.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = lb_calld->send_message_payload;
+ GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure,
+ client_load_report_done_locked, lb_calld,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
+static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) {
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ lb_calld->client_load_report_timer_callback_pending = false;
+ if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
+ glb_lb_call_data_unref(lb_calld, "client_load_report");
+ return;
+ }
// If we've already sent the initial request, then we can go ahead and send
// the load report. Otherwise, we need to wait until the initial request has
- // been sent to send this (see lb_on_sent_initial_request_locked() below).
- if (glb_policy->initial_request_sent) {
- do_send_client_load_report_locked(glb_policy);
+ // been sent to send this (see lb_on_sent_initial_request_locked()).
+ if (lb_calld->send_message_payload == nullptr) {
+ send_client_load_report_locked(lb_calld);
+ } else {
+ lb_calld->client_load_report_is_due = true;
}
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
-static void lb_call_init_locked(glb_lb_policy* glb_policy) {
+static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) {
+ GPR_ASSERT(!glb_policy->shutting_down);
+ // Init the LB call. Note that the LB call will progress every time there's
+ // activity in glb_policy->base.interested_parties, which is comprised of the
+ // polling entities from client_channel.
GPR_ASSERT(glb_policy->server_name != nullptr);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
- GPR_ASSERT(glb_policy->lb_call == nullptr);
- GPR_ASSERT(!glb_policy->shutting_down);
-
- /* Note the following LB call progresses every time there's activity in \a
- * glb_policy->base.interested_parties, which is comprised of the polling
- * entities from \a client_channel. */
grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
grpc_millis deadline =
glb_policy->lb_call_timeout_ms == 0
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
- glb_policy->lb_call = grpc_channel_create_pollset_set_call(
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)gpr_zalloc(sizeof(*lb_calld));
+ lb_calld->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, deadline, nullptr);
grpc_slice_unref_internal(host);
-
- if (glb_policy->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(glb_policy->client_stats);
- }
- glb_policy->client_stats = grpc_grpclb_client_stats_create();
-
- grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
- grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
-
+ // Init the LB call request payload.
grpc_grpclb_request* request =
grpc_grpclb_request_create(glb_policy->server_name);
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- glb_policy->lb_request_payload =
+ lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
-
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
- lb_on_sent_initial_request_locked, glb_policy,
+ // Init other data associated with the LB call.
+ lb_calld->glb_policy = glb_policy;
+ gpr_ref_init(&lb_calld->refs, 1);
+ grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv);
+ grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv);
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request,
+ lb_on_sent_initial_request_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received_locked, glb_policy,
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received,
+ lb_on_response_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
- lb_on_response_received_locked, glb_policy,
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received,
+ lb_on_server_status_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
-
- grpc_core::BackOff::Options backoff_options;
- backoff_options
- .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
- .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
- .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
- .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
-
- glb_policy->lb_call_backoff.Init(backoff_options);
-
- glb_policy->initial_request_sent = false;
- glb_policy->seen_initial_response = false;
- glb_policy->last_client_load_report_counters_were_zero = false;
-}
-
-static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
- GPR_ASSERT(glb_policy->lb_call != nullptr);
- grpc_call_unref(glb_policy->lb_call);
- glb_policy->lb_call = nullptr;
-
- grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
- grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
-
- grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
- grpc_slice_unref_internal(glb_policy->lb_call_status_details);
-
- if (glb_policy->client_load_report_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->client_load_report_timer);
- }
+ // Hold a ref to the glb_policy.
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld");
+ return lb_calld;
}
/*
* Auxiliary functions and LB client callbacks.
*/
+
static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != nullptr);
if (glb_policy->shutting_down) return;
-
- lb_call_init_locked(glb_policy);
-
+ // Init the LB call data.
+ GPR_ASSERT(glb_policy->lb_calld == nullptr);
+ glb_policy->lb_calld = lb_call_data_create_locked(glb_policy);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
- glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
+ "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, "
+ "lb_call: %p)",
+ glb_policy, glb_policy->lb_channel, glb_policy->lb_calld,
+ glb_policy->lb_calld->lb_call);
}
- GPR_ASSERT(glb_policy->lb_call != nullptr);
-
+ GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
+ // Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
-
+ // Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
+ // Op: send request message.
+ GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr);
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message =
+ glb_policy->lb_calld->send_message_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ glb_lb_call_data_ref(glb_policy->lb_calld,
+ "lb_on_sent_initial_request_locked");
+ call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_sent_initial_request);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv initial metadata.
+ op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
- &glb_policy->lb_initial_metadata_recv;
+ &glb_policy->lb_calld->lb_initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
- GPR_ASSERT(glb_policy->lb_request_payload != nullptr);
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message = glb_policy->lb_request_payload;
+ // Op: recv response.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message =
+ &glb_policy->lb_calld->recv_message_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a ref to be released in lb_on_sent_initial_request_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+ glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_sent_initial_request);
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
-
+ // Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
- &glb_policy->lb_trailing_metadata_recv;
- op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
+ &glb_policy->lb_calld->lb_trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status;
op->data.recv_status_on_client.status_details =
- &glb_policy->lb_call_status_details;
+ &glb_policy->lb_calld->lb_call_status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a ref to be released in lb_on_server_status_received_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
+ // This callback signals the end of the LB call, so it relies on the initial
+ // ref instead of a new ref. When it's invoked, it's the initial ref that is
+ // unreffed.
call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_server_status_received);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-
- op = ops;
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
- call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_response_received);
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->initial_request_sent = true;
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ lb_calld->send_message_payload = nullptr;
// If we attempted to send a client load report before the initial request was
- // sent, send the load report now.
- if (glb_policy->client_load_report_payload != nullptr) {
- do_send_client_load_report_locked(glb_policy);
+ // sent (and this lb_calld is still in use), send the load report now.
+ if (lb_calld->client_load_report_is_due &&
+ lb_calld == lb_calld->glb_policy->lb_calld) {
+ send_client_load_report_locked(lb_calld);
+ lb_calld->client_load_report_is_due = false;
}
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+ glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked");
}
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ // Empty payload means the LB call was cancelled.
+ if (lb_calld != glb_policy->lb_calld ||
+ lb_calld->recv_message_payload == nullptr) {
+ glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked");
+ return;
+ }
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
- if (glb_policy->lb_response_payload != nullptr) {
- glb_policy->lb_call_backoff->Reset();
- /* Received data from the LB server. Look inside
- * glb_policy->lb_response_payload, for a serverlist. */
- grpc_byte_buffer_reader bbr;
- grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
- grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
- grpc_byte_buffer_reader_destroy(&bbr);
- grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
-
- grpc_grpclb_initial_response* response = nullptr;
- if (!glb_policy->seen_initial_response &&
- (response = grpc_grpclb_initial_response_parse(response_slice)) !=
- nullptr) {
- if (response->has_client_stats_report_interval) {
- glb_policy->client_stats_report_interval = GPR_MAX(
- GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
- &response->client_stats_report_interval));
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; "
- "client load reporting interval = %" PRIdPTR " milliseconds",
- glb_policy, glb_policy->client_stats_report_interval);
- }
- /* take a ref to be unref'd in send_client_load_report_locked() */
- glb_policy->client_load_report_timer_callback_pending = true;
- GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
- schedule_next_client_load_report(glb_policy);
- } else if (grpc_lb_glb_trace.enabled()) {
+ glb_policy->lb_call_backoff->Reset();
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload);
+ grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
+ lb_calld->recv_message_payload = nullptr;
+ grpc_grpclb_initial_response* initial_response;
+ grpc_grpclb_serverlist* serverlist;
+ if (!lb_calld->seen_initial_response &&
+ (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
+ nullptr) {
+ // Have NOT seen initial response, look for initial response.
+ if (initial_response->has_client_stats_report_interval) {
+ lb_calld->client_stats_report_interval = GPR_MAX(
+ GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
+ &initial_response->client_stats_report_interval));
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; client load "
- "reporting NOT enabled",
- glb_policy);
+ "[grpclb %p] Received initial LB response message; "
+ "client load reporting interval = %" PRIdPTR " milliseconds",
+ glb_policy, lb_calld->client_stats_report_interval);
}
- grpc_grpclb_initial_response_destroy(response);
- glb_policy->seen_initial_response = true;
- } else {
- grpc_grpclb_serverlist* serverlist =
- grpc_grpclb_response_parse_serverlist(response_slice);
- if (serverlist != nullptr) {
- GPR_ASSERT(glb_policy->lb_call != nullptr);
+ } else if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Received initial LB response message; client load "
+ "reporting NOT enabled",
+ glb_policy);
+ }
+ grpc_grpclb_initial_response_destroy(initial_response);
+ lb_calld->seen_initial_response = true;
+ } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
+ response_slice)) != nullptr) {
+ // Have seen initial response, look for serverlist.
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
+ glb_policy, serverlist->num_servers);
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ grpc_resolved_address addr;
+ parse_server(serverlist->servers[i], &addr);
+ char* ipport;
+ grpc_sockaddr_to_string(&ipport, &addr, false);
+ gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
+ glb_policy, i, ipport);
+ gpr_free(ipport);
+ }
+ }
+ /* update serverlist */
+ if (serverlist->num_servers > 0) {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval > 0 &&
+ lb_calld->client_stats == nullptr) {
+ lb_calld->client_stats = grpc_grpclb_client_stats_create();
+ glb_lb_call_data_ref(lb_calld, "client_load_report");
+ schedule_next_client_load_report(lb_calld);
+ }
+ if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
- glb_policy, serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_resolved_address addr;
- parse_server(serverlist->servers[i], &addr);
- char* ipport;
- grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
- glb_policy, i, ipport);
- gpr_free(ipport);
- }
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
+ glb_policy);
}
- /* update serverlist */
- if (serverlist->num_servers > 0) {
- if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
- serverlist)) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- glb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
- } else { /* new serverlist */
- if (glb_policy->serverlist != nullptr) {
- /* dispose of the old serverlist */
- grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
- } else {
- /* or dispose of the fallback */
- grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
- glb_policy->fallback_backend_addresses = nullptr;
- if (glb_policy->fallback_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- }
- }
- /* and update the copy in the glb_lb_policy instance. This
- * serverlist instance will be destroyed either upon the next
- * update or in glb_destroy() */
- glb_policy->serverlist = serverlist;
- glb_policy->serverlist_index = 0;
- rr_handover_locked(glb_policy);
- }
+ grpc_grpclb_destroy_serverlist(serverlist);
+ } else { /* new serverlist */
+ if (glb_policy->serverlist != nullptr) {
+ /* dispose of the old serverlist */
+ grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received empty server list, ignoring.",
- glb_policy);
+ /* or dispose of the fallback */
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
+ glb_policy->fallback_backend_addresses = nullptr;
+ if (glb_policy->fallback_timer_callback_pending) {
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
+ glb_policy->fallback_timer_callback_pending = false;
}
- grpc_grpclb_destroy_serverlist(serverlist);
}
- } else { /* serverlist == nullptr */
- gpr_log(GPR_ERROR,
- "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
- glb_policy,
- grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+ /* and update the copy in the glb_lb_policy instance. This
+ * serverlist instance will be destroyed either upon the next
+ * update or in glb_destroy() */
+ glb_policy->serverlist = serverlist;
+ glb_policy->serverlist_index = 0;
+ rr_handover_locked(glb_policy);
+ }
+ } else {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+ glb_policy);
}
+ grpc_grpclb_destroy_serverlist(serverlist);
}
- grpc_slice_unref_internal(response_slice);
- if (!glb_policy->shutting_down) {
- /* keep listening for serverlist updates */
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- /* reuse the "lb_on_response_received_locked" ref taken in
- * query_for_backends_locked() */
- const grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_response_received); /* loop */
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ } else {
+ // No valid initial response or serverlist found.
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
+ glb_policy,
+ grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+ }
+ grpc_slice_unref_internal(response_slice);
+ if (!glb_policy->shutting_down) {
+ // Keep listening for serverlist updates.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &lb_calld->recv_message_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Reuse the "lb_on_response_received_locked" ref taken in
+ // query_for_backends_locked().
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call, ops, (size_t)(op - ops),
+ &lb_calld->lb_on_response_received);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ } else {
+ glb_lb_call_data_unref(lb_calld,
+ "lb_on_response_received_locked+glb_shutdown");
+ }
+}
+
+static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ if (grpc_lb_glb_trace.enabled()) {
+ char* status_details =
+ grpc_slice_to_c_string(lb_calld->lb_call_status_details);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Status from LB server received. Status = %d, details "
+ "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
+ lb_calld->glb_policy, lb_calld->lb_call_status, status_details,
+ lb_calld, lb_calld->lb_call, grpc_error_string(error));
+ gpr_free(status_details);
+ }
+ // If this lb_calld is still in use, this call ended because of a failure so
+ // we want to retry connecting. Otherwise, we have deliberately ended this
+ // call and no further action is required.
+ if (lb_calld == glb_policy->lb_calld) {
+ glb_policy->lb_calld = nullptr;
+ if (lb_calld->client_load_report_timer_callback_pending) {
+ grpc_timer_cancel(&lb_calld->client_load_report_timer);
+ }
+ GPR_ASSERT(!glb_policy->shutting_down);
+ if (lb_calld->seen_initial_response) {
+ // If we lose connection to the LB server, reset the backoff and restart
+ // the LB call immediately.
+ glb_policy->lb_call_backoff->Reset();
+ query_for_backends_locked(glb_policy);
} else {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_shutdown");
+ // If this LB call fails establishing any connection to the LB server,
+ // retry later.
+ start_lb_call_retry_timer_locked(glb_policy);
}
- } else { /* empty payload: call cancelled. */
- /* dispose of the "lb_on_response_received_locked" ref taken in
- * query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_empty_payload");
}
+ glb_lb_call_data_unref(lb_calld, "lb_call_ended");
}
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
@@ -1597,29 +1656,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- GPR_ASSERT(glb_policy->lb_call != nullptr);
- if (grpc_lb_glb_trace.enabled()) {
- char* status_details =
- grpc_slice_to_c_string(glb_policy->lb_call_status_details);
- gpr_log(GPR_INFO,
- "[grpclb %p] Status from LB server received. Status = %d, Details "
- "= '%s', (call: %p), error '%s'",
- glb_policy, glb_policy->lb_call_status, status_details,
- glb_policy->lb_call, grpc_error_string(error));
- gpr_free(status_details);
- }
- /* We need to perform cleanups no matter what. */
- lb_call_destroy_locked(glb_policy);
- // If the load report timer is still pending, we wait for it to be
- // called before restarting the call. Otherwise, we restart the call
- // here.
- if (!glb_policy->client_load_report_timer_callback_pending) {
- maybe_restart_lb_call(glb_policy);
- }
-}
-
static void fallback_update_locked(glb_lb_policy* glb_policy,
const grpc_lb_addresses* addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
@@ -1701,7 +1737,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
switch (glb_policy->lb_channel_connectivity) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- /* resub. */
+ // Keep watching the LB channel.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
@@ -1714,29 +1750,26 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
&glb_policy->lb_channel_on_connectivity_changed, nullptr);
break;
}
+ // The LB channel may be IDLE because it's shut down before the update.
+ // Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE:
- // lb channel inactive (probably shutdown prior to update). Restart lb
- // call to kick the lb channel into gear.
- /* fallthrough */
case GRPC_CHANNEL_READY:
- if (glb_policy->lb_call != nullptr) {
- glb_policy->updating_lb_call = true;
- grpc_call_cancel(glb_policy->lb_call, nullptr);
- // lb_on_server_status_received() will pick up the cancel and reinit
- // lb_call.
- } else if (glb_policy->started_picking) {
+ if (glb_policy->lb_calld != nullptr) {
+ lb_call_data_shutdown(glb_policy);
+ }
+ if (glb_policy->started_picking) {
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
- start_picking_locked(glb_policy);
+ glb_policy->lb_call_backoff->Reset();
+ query_for_backends_locked(glb_policy);
}
- /* fallthrough */
+ // Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown");
- break;
}
}
@@ -1851,6 +1884,14 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
+ // Init LB call backoff option.
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ glb_policy->lb_call_backoff.Init(backoff_options);
return &glb_policy->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index e217a0b0c0..24c381a46d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -34,7 +34,7 @@
#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr++/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index f4e345def6..3377605263 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -22,7 +22,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr++/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/transport/connectivity_state.h"
// TODO(roth): This code is intended to be shared between pick_first and
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 1efdc26d56..6ba5f932f0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -34,9 +34,9 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/gethostname.h"
#include "src/core/lib/iomgr/resolve_address.h"
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 66a03c5a85..62f03d52c0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -29,9 +29,9 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index bb43651d0c..cad8578511 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -37,8 +37,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr++/debug_location.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index f2a5c1e273..b7593ec911 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -21,9 +21,9 @@
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/gpr++/ref_counted.h"
-#include "src/core/lib/gpr++/ref_counted_ptr.h"
#include "src/core/lib/gpr/arena.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index e067b696a1..530ab17bc7 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1886,7 +1886,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
grpc_chttp2_maybe_complete_recv_message(t, s);
if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
s->write_closed) {
- if (s->seen_error) {
+ if (s->seen_error || !t->is_client) {
grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h
index 7e83ea05cd..2ee1345260 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.h
+++ b/src/core/ext/transport/chttp2/transport/flow_control.h
@@ -24,8 +24,8 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
-#include "src/core/lib/gpr++/abstract.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/pid_controller.h"
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index de901f0ce8..6b6c0b28e2 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -35,7 +35,7 @@
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/compression/stream_compression.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h"
diff --git a/src/core/lib/gpr/alloc.cc b/src/core/lib/gpr/alloc.cc
index 518bdb99f7..000b7dcb25 100644
--- a/src/core/lib/gpr/alloc.cc
+++ b/src/core/lib/gpr/alloc.cc
@@ -90,8 +90,8 @@ void* gpr_realloc(void* p, size_t size) {
return p;
}
-void* gpr_malloc_aligned(size_t size, size_t alignment_log) {
- size_t alignment = ((size_t)1) << alignment_log;
+void* gpr_malloc_aligned(size_t size, size_t alignment) {
+ GPR_ASSERT(((alignment - 1) & alignment) == 0); // Must be power of 2.
size_t extra = alignment - 1 + sizeof(void*);
void* p = gpr_malloc(size + extra);
void** ret = (void**)(((uintptr_t)p + extra) & ~(alignment - 1));
diff --git a/src/core/lib/gpr/arena.cc b/src/core/lib/gpr/arena.cc
index 177c176732..687592a140 100644
--- a/src/core/lib/gpr/arena.cc
+++ b/src/core/lib/gpr/arena.cc
@@ -17,11 +17,19 @@
*/
#include "src/core/lib/gpr/arena.h"
+
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
+// TODO(roth): We currently assume that all callers need alignment of 16
+// bytes, which may be wrong in some cases. As part of converting the
+// arena API to C++, we should consider replacing gpr_arena_alloc() with a
+// template that takes the type of the value being allocated, which
+// would allow us to use the alignment actually needed by the caller.
#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
(((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
@@ -36,9 +44,16 @@ struct gpr_arena {
zone initial_zone;
};
+static void* zalloc_aligned(size_t size) {
+ void* ptr = gpr_malloc_aligned(size, GPR_MAX_ALIGNMENT);
+ memset(ptr, 0, size);
+ return ptr;
+}
+
gpr_arena* gpr_arena_create(size_t initial_size) {
initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size);
- gpr_arena* a = (gpr_arena*)gpr_zalloc(sizeof(gpr_arena) + initial_size);
+ gpr_arena* a = (gpr_arena*)zalloc_aligned(
+ ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena)) + initial_size);
a->initial_zone.size_end = initial_size;
return a;
}
@@ -46,10 +61,10 @@ gpr_arena* gpr_arena_create(size_t initial_size) {
size_t gpr_arena_destroy(gpr_arena* arena) {
gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far);
zone* z = (zone*)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm);
- gpr_free(arena);
+ gpr_free_aligned(arena);
while (z) {
zone* next_z = (zone*)gpr_atm_no_barrier_load(&z->next_atm);
- gpr_free(z);
+ gpr_free_aligned(z);
z = next_z;
}
return (size_t)size;
@@ -64,11 +79,12 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
zone* next_z = (zone*)gpr_atm_acq_load(&z->next_atm);
if (next_z == nullptr) {
size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far);
- next_z = (zone*)gpr_zalloc(sizeof(zone) + next_z_size);
+ next_z = (zone*)zalloc_aligned(ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone)) +
+ next_z_size);
next_z->size_begin = z->size_end;
next_z->size_end = z->size_end + next_z_size;
if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) {
- gpr_free(next_z);
+ gpr_free_aligned(next_z);
next_z = (zone*)gpr_atm_acq_load(&z->next_atm);
}
}
@@ -79,5 +95,8 @@ void* gpr_arena_alloc(gpr_arena* arena, size_t size) {
}
GPR_ASSERT(start >= z->size_begin);
GPR_ASSERT(start + size <= z->size_end);
- return ((char*)(z + 1)) + start - z->size_begin;
+ char* ptr = (z == &arena->initial_zone)
+ ? (char*)arena + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(gpr_arena))
+ : (char*)z + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(zone));
+ return ptr + start - z->size_begin;
}
diff --git a/src/core/lib/gpr/fork.cc b/src/core/lib/gpr/fork.cc
index c47e686378..92023f4350 100644
--- a/src/core/lib/gpr/fork.cc
+++ b/src/core/lib/gpr/fork.cc
@@ -38,18 +38,32 @@ void grpc_fork_support_init() {
fork_support_enabled = 1;
#else
fork_support_enabled = 0;
+#endif
+ bool env_var_set = false;
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
if (env != nullptr) {
static const char* truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
+ static const char* falsey[] = {"no", "No", "NO", "false",
+ "False", "FALSE", "0"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) {
fork_support_enabled = 1;
+ env_var_set = true;
+ break;
+ }
+ }
+ if (!env_var_set) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
+ if (0 == strcmp(env, falsey[i])) {
+ fork_support_enabled = 0;
+ env_var_set = true;
+ break;
+ }
}
}
gpr_free(env);
}
-#endif
if (override_fork_support_enabled != -1) {
fork_support_enabled = override_fork_support_enabled;
}
diff --git a/src/core/lib/gpr++/README.md b/src/core/lib/gprpp/README.md
index eab018bb31..eab018bb31 100644
--- a/src/core/lib/gpr++/README.md
+++ b/src/core/lib/gprpp/README.md
diff --git a/src/core/lib/gpr++/abstract.h b/src/core/lib/gprpp/abstract.h
index 51d7572302..cc96edc49b 100644
--- a/src/core/lib/gpr++/abstract.h
+++ b/src/core/lib/gprpp/abstract.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_ABSTRACT_H
-#define GRPC_CORE_LIB_GPRXX_ABSTRACT_H
+#ifndef GRPC_CORE_LIB_GPRPP_ABSTRACT_H
+#define GRPC_CORE_LIB_GPRPP_ABSTRACT_H
// This is needed to support abstract base classes in the c core. Since gRPC
// doesn't have a c++ runtime, it will hit a linker error on delete unless
@@ -31,4 +31,4 @@
#define GRPC_ABSTRACT \
{ GPR_ASSERT(false); }
-#endif /* GRPC_CORE_LIB_GPRXX_ABSTRACT_H */
+#endif /* GRPC_CORE_LIB_GPRPP_ABSTRACT_H */
diff --git a/src/core/lib/gpr++/atomic.h b/src/core/lib/gprpp/atomic.h
index d68ccea864..8b08fc4e9c 100644
--- a/src/core/lib/gpr++/atomic.h
+++ b/src/core/lib/gprpp/atomic.h
@@ -16,15 +16,15 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_H
-#define GRPC_CORE_LIB_GPRXX_ATOMIC_H
+#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_H
+#define GRPC_CORE_LIB_GPRPP_ATOMIC_H
#include <grpc/support/port_platform.h>
#ifdef GPR_HAS_CXX11_ATOMIC
-#include "src/core/lib/gpr++/atomic_with_std.h"
+#include "src/core/lib/gprpp/atomic_with_std.h"
#else
-#include "src/core/lib/gpr++/atomic_with_atm.h"
+#include "src/core/lib/gprpp/atomic_with_atm.h"
#endif
-#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_H */
+#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_H */
diff --git a/src/core/lib/gpr++/atomic_with_atm.h b/src/core/lib/gprpp/atomic_with_atm.h
index 09490e8148..6abf0bc38d 100644
--- a/src/core/lib/gpr++/atomic_with_atm.h
+++ b/src/core/lib/gprpp/atomic_with_atm.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H
-#define GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H
+#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H
+#define GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H
#include <grpc/support/atm.h>
@@ -52,4 +52,4 @@ class atomic<bool> {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_ATM_H */
+#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_ATM_H */
diff --git a/src/core/lib/gpr++/atomic_with_std.h b/src/core/lib/gprpp/atomic_with_std.h
index b6ff90655e..83322b81c1 100644
--- a/src/core/lib/gpr++/atomic_with_std.h
+++ b/src/core/lib/gprpp/atomic_with_std.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H
-#define GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H
+#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H
+#define GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H
#include <atomic>
@@ -30,4 +30,4 @@ typedef std::memory_order memory_order;
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_ATOMIC_WITH_STD_H */
+#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_WITH_STD_H */
diff --git a/src/core/lib/gpr++/debug_location.h b/src/core/lib/gprpp/debug_location.h
index 5a8665ce19..287761beaf 100644
--- a/src/core/lib/gpr++/debug_location.h
+++ b/src/core/lib/gprpp/debug_location.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H
-#define GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H
+#ifndef GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H
+#define GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H
namespace grpc_core {
@@ -49,4 +49,4 @@ class DebugLocation {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_DEBUG_LOCATION_H */
+#endif /* GRPC_CORE_LIB_GPRPP_DEBUG_LOCATION_H */
diff --git a/src/core/lib/gpr++/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index 17ee9e16bb..b78f85b893 100644
--- a/src/core/lib/gpr++/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -16,12 +16,12 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H
-#define GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H
+#ifndef GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H
+#define GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H
#include <cassert>
-#include "src/core/lib/gpr++/memory.h"
+#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
@@ -109,4 +109,4 @@ class InlinedVector {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_INLINED_VECTOR_H */
+#endif /* GRPC_CORE_LIB_GPRPP_INLINED_VECTOR_H */
diff --git a/src/core/lib/gpr++/manual_constructor.h b/src/core/lib/gprpp/manual_constructor.h
index a3f006da34..cee38abc1b 100644
--- a/src/core/lib/gpr++/manual_constructor.h
+++ b/src/core/lib/gprpp/manual_constructor.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_MANUAL_CONSTRUCTOR_H
-#define GRPC_CORE_LIB_GPRXX_MANUAL_CONSTRUCTOR_H
+#ifndef GRPC_CORE_LIB_GPRPP_MANUAL_CONSTRUCTOR_H
+#define GRPC_CORE_LIB_GPRPP_MANUAL_CONSTRUCTOR_H
// manually construct a region of memory with some type
diff --git a/src/core/lib/gpr++/memory.h b/src/core/lib/gprpp/memory.h
index 75ed3d6cea..17f42f5983 100644
--- a/src/core/lib/gpr++/memory.h
+++ b/src/core/lib/gprpp/memory.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_MEMORY_H
-#define GRPC_CORE_LIB_GPRXX_MEMORY_H
+#ifndef GRPC_CORE_LIB_GPRPP_MEMORY_H
+#define GRPC_CORE_LIB_GPRPP_MEMORY_H
#include <grpc/support/alloc.h>
@@ -97,4 +97,4 @@ class Allocator {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_MEMORY_H */
+#endif /* GRPC_CORE_LIB_GPRPP_MEMORY_H */
diff --git a/src/core/lib/gpr++/orphanable.h b/src/core/lib/gprpp/orphanable.h
index f106e74dde..50199730c9 100644
--- a/src/core/lib/gpr++/orphanable.h
+++ b/src/core/lib/gprpp/orphanable.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_ORPHANABLE_H
-#define GRPC_CORE_LIB_GPRXX_ORPHANABLE_H
+#ifndef GRPC_CORE_LIB_GPRPP_ORPHANABLE_H
+#define GRPC_CORE_LIB_GPRPP_ORPHANABLE_H
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -25,9 +25,9 @@
#include <memory>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr++/abstract.h"
-#include "src/core/lib/gpr++/debug_location.h"
-#include "src/core/lib/gpr++/memory.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
@@ -168,4 +168,4 @@ class InternallyRefCountedWithTracing : public Orphanable {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_ORPHANABLE_H */
+#endif /* GRPC_CORE_LIB_GPRPP_ORPHANABLE_H */
diff --git a/src/core/lib/gpr++/ref_counted.h b/src/core/lib/gprpp/ref_counted.h
index c2ae76c0ae..c68118a71a 100644
--- a/src/core/lib/gpr++/ref_counted.h
+++ b/src/core/lib/gprpp/ref_counted.h
@@ -16,16 +16,16 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_REF_COUNTED_H
-#define GRPC_CORE_LIB_GPRXX_REF_COUNTED_H
+#ifndef GRPC_CORE_LIB_GPRPP_REF_COUNTED_H
+#define GRPC_CORE_LIB_GPRPP_REF_COUNTED_H
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/gpr++/abstract.h"
-#include "src/core/lib/gpr++/debug_location.h"
-#include "src/core/lib/gpr++/memory.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/debug_location.h"
+#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
@@ -130,4 +130,4 @@ class RefCountedWithTracing {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_REF_COUNTED_H */
+#endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_H */
diff --git a/src/core/lib/gpr++/ref_counted_ptr.h b/src/core/lib/gprpp/ref_counted_ptr.h
index 862294d1aa..dda0f00d79 100644
--- a/src/core/lib/gpr++/ref_counted_ptr.h
+++ b/src/core/lib/gprpp/ref_counted_ptr.h
@@ -16,12 +16,12 @@
*
*/
-#ifndef GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H
-#define GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H
+#ifndef GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H
+#define GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H
#include <utility>
-#include "src/core/lib/gpr++/memory.h"
+#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
@@ -96,4 +96,4 @@ inline RefCountedPtr<T> MakeRefCounted(Args&&... args) {
} // namespace grpc_core
-#endif /* GRPC_CORE_LIB_GPRXX_REF_COUNTED_PTR_H */
+#endif /* GRPC_CORE_LIB_GPRPP_REF_COUNTED_PTR_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 1cb0150f45..42d7cdd348 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -43,8 +43,8 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index b81c00ca7a..416e8384b4 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -41,8 +41,8 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
#include "src/core/lib/gpr/spinlock.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 11c64d080c..1518348992 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -43,7 +43,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/gpr++/manual_constructor.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 27a2a4eeb6..a1f1cf1107 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -23,7 +23,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/lib/gpr++/atomic.h"
+#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/string.h"
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index 7d36c6c9e1..19498a6df7 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -23,4 +23,4 @@
const char* grpc_version_string(void) { return "5.0.0-dev"; }
-const char* grpc_g_stands_for(void) { return "glossy"; }
+const char* grpc_g_stands_for(void) { return "glamorous"; }
diff --git a/src/core/tsi/ssl_transport_security.cc b/src/core/tsi/ssl_transport_security.cc
index 229f7efd37..b396a621bd 100644
--- a/src/core/tsi/ssl_transport_security.cc
+++ b/src/core/tsi/ssl_transport_security.cc
@@ -96,8 +96,7 @@ struct tsi_ssl_server_handshaker_factory {
typedef struct {
tsi_handshaker base;
SSL* ssl;
- BIO* into_ssl;
- BIO* from_ssl;
+ BIO* network_io;
tsi_result result;
tsi_ssl_handshaker_factory* factory_ref;
} tsi_ssl_handshaker;
@@ -105,8 +104,7 @@ typedef struct {
typedef struct {
tsi_frame_protector base;
SSL* ssl;
- BIO* into_ssl;
- BIO* from_ssl;
+ BIO* network_io;
unsigned char* buffer;
size_t buffer_size;
size_t buffer_offset;
@@ -730,11 +728,11 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self,
tsi_result result = TSI_OK;
/* First see if we have some pending data in the SSL BIO. */
- int pending_in_ssl = (int)BIO_pending(impl->from_ssl);
+ int pending_in_ssl = (int)BIO_pending(impl->network_io);
if (pending_in_ssl > 0) {
*unprotected_bytes_size = 0;
GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
- read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames,
+ read_from_ssl = BIO_read(impl->network_io, protected_output_frames,
(int)*protected_output_frames_size);
if (read_from_ssl < 0) {
gpr_log(GPR_ERROR,
@@ -762,7 +760,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector* self,
if (result != TSI_OK) return result;
GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
- read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames,
+ read_from_ssl = BIO_read(impl->network_io, protected_output_frames,
(int)*protected_output_frames_size);
if (read_from_ssl < 0) {
gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write.");
@@ -788,20 +786,20 @@ static tsi_result ssl_protector_protect_flush(
impl->buffer_offset = 0;
}
- pending = (int)BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->network_io);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
if (*still_pending_size == 0) return TSI_OK;
GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
- read_from_ssl = BIO_read(impl->from_ssl, protected_output_frames,
+ read_from_ssl = BIO_read(impl->network_io, protected_output_frames,
(int)*protected_output_frames_size);
if (read_from_ssl <= 0) {
gpr_log(GPR_ERROR, "Could not read from BIO after SSL_write.");
return TSI_INTERNAL_ERROR;
}
*protected_output_frames_size = (size_t)read_from_ssl;
- pending = (int)BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->network_io);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
return TSI_OK;
@@ -831,7 +829,7 @@ static tsi_result ssl_protector_unprotect(
/* Then, try to write some data to ssl. */
GPR_ASSERT(*protected_frames_bytes_size <= INT_MAX);
- written_into_ssl = BIO_write(impl->into_ssl, protected_frames_bytes,
+ written_into_ssl = BIO_write(impl->network_io, protected_frames_bytes,
(int)*protected_frames_bytes_size);
if (written_into_ssl < 0) {
gpr_log(GPR_ERROR, "Sending protected frame to ssl failed with %d",
@@ -853,6 +851,7 @@ static void ssl_protector_destroy(tsi_frame_protector* self) {
tsi_ssl_frame_protector* impl = (tsi_ssl_frame_protector*)self;
if (impl->buffer != nullptr) gpr_free(impl->buffer);
if (impl->ssl != nullptr) SSL_free(impl->ssl);
+ if (impl->network_io != nullptr) BIO_free(impl->network_io);
gpr_free(self);
}
@@ -916,10 +915,10 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self,
return TSI_INVALID_ARGUMENT;
}
GPR_ASSERT(*bytes_size <= INT_MAX);
- bytes_read_from_ssl = BIO_read(impl->from_ssl, bytes, (int)*bytes_size);
+ bytes_read_from_ssl = BIO_read(impl->network_io, bytes, (int)*bytes_size);
if (bytes_read_from_ssl < 0) {
*bytes_size = 0;
- if (!BIO_should_retry(impl->from_ssl)) {
+ if (!BIO_should_retry(impl->network_io)) {
impl->result = TSI_INTERNAL_ERROR;
return impl->result;
} else {
@@ -927,7 +926,7 @@ static tsi_result ssl_handshaker_get_bytes_to_send_to_peer(tsi_handshaker* self,
}
}
*bytes_size = (size_t)bytes_read_from_ssl;
- return BIO_pending(impl->from_ssl) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA;
+ return BIO_pending(impl->network_io) == 0 ? TSI_OK : TSI_INCOMPLETE_DATA;
}
static tsi_result ssl_handshaker_get_result(tsi_handshaker* self) {
@@ -948,7 +947,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer(
}
GPR_ASSERT(*bytes_size <= INT_MAX);
bytes_written_into_ssl_size =
- BIO_write(impl->into_ssl, bytes, (int)*bytes_size);
+ BIO_write(impl->network_io, bytes, (int)*bytes_size);
if (bytes_written_into_ssl_size < 0) {
gpr_log(GPR_ERROR, "Could not write to memory BIO.");
impl->result = TSI_INTERNAL_ERROR;
@@ -965,7 +964,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer(
ssl_result = SSL_get_error(impl->ssl, ssl_result);
switch (ssl_result) {
case SSL_ERROR_WANT_READ:
- if (BIO_pending(impl->from_ssl) == 0) {
+ if (BIO_pending(impl->network_io) == 0) {
/* We need more data. */
return TSI_INCOMPLETE_DATA;
} else {
@@ -1058,12 +1057,13 @@ static tsi_result ssl_handshaker_create_frame_protector(
return TSI_INTERNAL_ERROR;
}
- /* Transfer ownership of ssl to the frame protector. It is OK as the caller
- * cannot call anything else but destroy on the handshaker after this call. */
+ /* Transfer ownership of ssl and network_io to the frame protector. It is OK
+ * as the caller cannot call anything else but destroy on the handshaker
+ * after this call. */
protector_impl->ssl = impl->ssl;
impl->ssl = nullptr;
- protector_impl->into_ssl = impl->into_ssl;
- protector_impl->from_ssl = impl->from_ssl;
+ protector_impl->network_io = impl->network_io;
+ impl->network_io = nullptr;
protector_impl->base.vtable = &frame_protector_vtable;
*protector = &protector_impl->base;
@@ -1072,7 +1072,8 @@ static tsi_result ssl_handshaker_create_frame_protector(
static void ssl_handshaker_destroy(tsi_handshaker* self) {
tsi_ssl_handshaker* impl = (tsi_ssl_handshaker*)self;
- SSL_free(impl->ssl); /* The BIO objects are owned by ssl */
+ SSL_free(impl->ssl);
+ BIO_free(impl->network_io);
tsi_ssl_handshaker_factory_unref(impl->factory_ref);
gpr_free(impl);
}
@@ -1094,8 +1095,8 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
tsi_ssl_handshaker_factory* factory,
tsi_handshaker** handshaker) {
SSL* ssl = SSL_new(ctx);
- BIO* into_ssl = nullptr;
- BIO* from_ssl = nullptr;
+ BIO* network_io = nullptr;
+ BIO* ssl_io = nullptr;
tsi_ssl_handshaker* impl = nullptr;
*handshaker = nullptr;
if (ctx == nullptr) {
@@ -1107,16 +1108,12 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
}
SSL_set_info_callback(ssl, ssl_info_callback);
- into_ssl = BIO_new(BIO_s_mem());
- from_ssl = BIO_new(BIO_s_mem());
- if (into_ssl == nullptr || from_ssl == nullptr) {
- gpr_log(GPR_ERROR, "BIO_new failed.");
+ if (!BIO_new_bio_pair(&network_io, 0, &ssl_io, 0)) {
+ gpr_log(GPR_ERROR, "BIO_new_bio_pair failed.");
SSL_free(ssl);
- if (into_ssl != nullptr) BIO_free(into_ssl);
- if (from_ssl != nullptr) BIO_free(into_ssl);
return TSI_OUT_OF_RESOURCES;
}
- SSL_set_bio(ssl, into_ssl, from_ssl);
+ SSL_set_bio(ssl, ssl_io, ssl_io);
if (is_client) {
int ssl_result;
SSL_set_connect_state(ssl);
@@ -1125,6 +1122,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
gpr_log(GPR_ERROR, "Invalid server name indication %s.",
server_name_indication);
SSL_free(ssl);
+ BIO_free(network_io);
return TSI_INTERNAL_ERROR;
}
}
@@ -1135,6 +1133,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
"Unexpected error received from first SSL_do_handshake call: %s",
ssl_error_string(ssl_result));
SSL_free(ssl);
+ BIO_free(network_io);
return TSI_INTERNAL_ERROR;
}
} else {
@@ -1143,8 +1142,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
impl = (tsi_ssl_handshaker*)gpr_zalloc(sizeof(*impl));
impl->ssl = ssl;
- impl->into_ssl = into_ssl;
- impl->from_ssl = from_ssl;
+ impl->network_io = network_io;
impl->result = TSI_HANDSHAKE_IN_PROGRESS;
impl->base.vtable = &handshaker_vtable;
impl->factory_ref = tsi_ssl_handshaker_factory_ref(factory);
diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc
index 7f01a66dcf..8bc926048f 100644
--- a/src/cpp/common/version_cc.cc
+++ b/src/cpp/common/version_cc.cc
@@ -22,5 +22,5 @@
#include <grpc++/grpc++.h>
namespace grpc {
-grpc::string Version() { return "1.9.0-dev"; }
+grpc::string Version() { return "1.10.0-dev"; }
} // namespace grpc
diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
index b6bb0a9eae..9c6f8a2117 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
@@ -60,6 +60,16 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
+ public void LeaseSetsReturnAction()
+ {
+ var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0);
+ var origLeased = pool.Lease();
+ origLeased.ReturnAction(origLeased);
+ pool.Dispose();
+ Assert.AreNotSame(origLeased, pool.Lease());
+ }
+
+ [Test]
public void Constructor()
{
Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2));
@@ -67,8 +77,14 @@ namespace Grpc.Core.Internal.Tests
Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1));
}
- class TestPooledObject : IDisposable
+ class TestPooledObject : IPooledObject<TestPooledObject>
{
+ public Action<TestPooledObject> ReturnAction;
+
+ public void SetReturnToPoolAction(Action<TestPooledObject> returnAction)
+ {
+ this.ReturnAction = returnAction;
+ }
public void Dispose()
{
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 6296f1863b..6bb2f6c3e5 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -299,8 +299,8 @@ namespace Grpc.Core
private GrpcEnvironment()
{
GrpcNativeInit();
- batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
- requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(this.requestCallContextPool), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
+ batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
+ requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 83385ad7d3..53a859d18f 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -33,22 +33,21 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
- IObjectPool<BatchContextSafeHandle> ownedByPool;
+ Action<BatchContextSafeHandle> returnToPoolAction;
CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle()
{
}
- public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null)
+ public static BatchContextSafeHandle Create()
{
var ctx = Native.grpcsharp_batch_context_create();
- ctx.ownedByPool = ownedByPool;
return ctx;
}
@@ -60,6 +59,12 @@ namespace Grpc.Core.Internal
}
}
+ public void SetReturnToPoolAction(Action<BatchContextSafeHandle> returnAction)
+ {
+ GrpcPreconditions.CheckState(returnToPoolAction == null);
+ returnToPoolAction = returnAction;
+ }
+
public void SetCompletionCallback(BatchCompletionDelegate callback, object state)
{
GrpcPreconditions.CheckState(completionCallbackData.Callback == null);
@@ -109,10 +114,15 @@ namespace Grpc.Core.Internal
public void Recycle()
{
- if (ownedByPool != null)
+ if (returnToPoolAction != null)
{
Native.grpcsharp_batch_context_reset(this);
- ownedByPool.Return(this);
+
+ var origReturnAction = returnToPoolAction;
+ // Not clearing all the references to the pool could prevent garbage collection of the pool object
+ // and thus cause memory leaks.
+ returnToPoolAction = null;
+ origReturnAction(this);
}
else
{
diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
index 2f030f3e02..0e1dc4d158 100644
--- a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
+++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
@@ -27,9 +27,10 @@ namespace Grpc.Core.Internal
/// Pool of objects that combines a shared pool and a thread local pool.
/// </summary>
internal class DefaultObjectPool<T> : IObjectPool<T>
- where T : class, IDisposable
+ where T : class, IPooledObject<T>
{
readonly object myLock = new object();
+ readonly Action<T> returnAction;
readonly Func<T> itemFactory;
// Queue shared between threads, access needs to be synchronized.
@@ -54,6 +55,7 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
+ this.returnAction = Return;
this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
this.sharedQueue = new Queue<T>(sharedCapacity);
this.sharedCapacity = sharedCapacity;
@@ -74,6 +76,13 @@ namespace Grpc.Core.Internal
/// </summary>
public T Lease()
{
+ var item = LeaseInternal();
+ item.SetReturnToPoolAction(returnAction);
+ return item;
+ }
+
+ private T LeaseInternal()
+ {
var localData = threadLocalData.Value;
if (localData.Queue.Count > 0)
{
diff --git a/src/csharp/Grpc.Core/Internal/IPooledObject.cs b/src/csharp/Grpc.Core/Internal/IPooledObject.cs
new file mode 100644
index 0000000000..e20bd51dce
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/IPooledObject.cs
@@ -0,0 +1,34 @@
+#region Copyright notice and license
+
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// An object that can be pooled in <c>IObjectPool</c>.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ internal interface IPooledObject<T> : IDisposable
+ {
+ /// <summary>
+ /// Set the action that will be invoked to return a leased object to the pool.
+ /// </summary>
+ void SetReturnToPoolAction(Action<T> returnAction);
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
index 59e9d9b1ab..ebc2d6d8d6 100644
--- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
@@ -20,26 +20,26 @@ using System;
using System.Runtime.InteropServices;
using Grpc.Core;
using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpcsharp_request_call_context
/// </summary>
- internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
+ internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<RequestCallContextSafeHandle>
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>();
- IObjectPool<RequestCallContextSafeHandle> ownedByPool;
+ Action<RequestCallContextSafeHandle> returnToPoolAction;
private RequestCallContextSafeHandle()
{
}
- public static RequestCallContextSafeHandle Create(IObjectPool<RequestCallContextSafeHandle> ownedByPool = null)
+ public static RequestCallContextSafeHandle Create()
{
var ctx = Native.grpcsharp_request_call_context_create();
- ctx.ownedByPool = ownedByPool;
return ctx;
}
@@ -51,6 +51,12 @@ namespace Grpc.Core.Internal
}
}
+ public void SetReturnToPoolAction(Action<RequestCallContextSafeHandle> returnAction)
+ {
+ GrpcPreconditions.CheckState(returnToPoolAction == null);
+ returnToPoolAction = returnAction;
+ }
+
public RequestCallCompletionDelegate CompletionCallback { get; set; }
// Gets data of server_rpc_new completion.
@@ -76,10 +82,15 @@ namespace Grpc.Core.Internal
public void Recycle()
{
- if (ownedByPool != null)
+ if (returnToPoolAction != null)
{
Native.grpcsharp_request_call_context_reset(this);
- ownedByPool.Return(this);
+
+ var origReturnAction = returnToPoolAction;
+ // Not clearing all the references to the pool could prevent garbage collection of the pool object
+ // and thus cause memory leaks.
+ returnToPoolAction = null;
+ origReturnAction(this);
}
else
{
diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include
index 2d9e4ba16a..539d3a9f80 100755
--- a/src/csharp/Grpc.Core/Version.csproj.include
+++ b/src/csharp/Grpc.Core/Version.csproj.include
@@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
- <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion>
+ <GrpcCsharpVersion>1.10.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.3.0</GoogleProtobufVersion>
</PropertyGroup>
</Project>
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 9b5da1c947..f1aef46c6c 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -33,11 +33,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "1.9.0.0";
+ public const string CurrentAssemblyFileVersion = "1.10.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "1.9.0-dev";
+ public const string CurrentVersion = "1.10.0-dev";
}
}
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index 8f89e2846a..4087d8b67a 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -13,7 +13,7 @@
@rem limitations under the License.
@rem Current package versions
-set VERSION=1.9.0-dev
+set VERSION=1.10.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index 6a6cafe2bd..8ccc537a60 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts
dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts
dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
-nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
-nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.nuspec -Version "1.10.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.Tools.nuspec -Version "1.10.0-dev" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)
diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
index 22501765f9..037ad4d9b0 100644
--- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
+++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
@@ -42,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
- v = '1.9.0-dev'
+ v = '1.10.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC
diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h
index 69dd6266fd..5c134e3642 100644
--- a/src/objective-c/GRPCClient/private/version.h
+++ b/src/objective-c/GRPCClient/private/version.h
@@ -23,4 +23,4 @@
// `tools/buildgen/generate_projects.sh`.
-#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev"
+#define GRPC_OBJC_VERSION_STRING @"1.10.0-dev"
diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h
index 6e3a073020..d8581b9779 100644
--- a/src/objective-c/tests/version.h
+++ b/src/objective-c/tests/version.h
@@ -23,5 +23,5 @@
// `tools/buildgen/generate_projects.sh`.
-#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev"
+#define GRPC_OBJC_VERSION_STRING @"1.10.0-dev"
#define GRPC_C_VERSION_STRING @"5.0.0-dev"
diff --git a/src/php/composer.json b/src/php/composer.json
index 43833980f9..ea21417956 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -2,7 +2,7 @@
"name": "grpc/grpc-dev",
"description": "gRPC library for PHP - for Developement use only",
"license": "Apache-2.0",
- "version": "1.9.0",
+ "version": "1.10.0",
"require": {
"php": ">=5.5.0",
"google/protobuf": "^v3.3.0"
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index c4997f720d..ff55c3cbfa 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -99,6 +99,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array
1 TSRMLS_CC);
efree(str_key);
efree(str_val);
+ PHP_GRPC_FREE_STD_ZVAL(array);
return NULL;
}
php_grpc_add_next_index_stringl(data, str_val,
@@ -127,10 +128,12 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) {
HashTable *inner_array_hash;
zval *value;
zval *inner_array;
+ grpc_metadata_array_init(metadata);
+ metadata->count = 0;
+ metadata->metadata = NULL;
if (Z_TYPE_P(array) != IS_ARRAY) {
return false;
}
- grpc_metadata_array_init(metadata);
array_hash = Z_ARRVAL_P(array);
char *key;
@@ -174,6 +177,18 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) {
return true;
}
+void grpc_php_metadata_array_destroy_including_entries(
+ grpc_metadata_array* array) {
+ size_t i;
+ if (array->metadata) {
+ for (i = 0; i < array->count; i++) {
+ grpc_slice_unref(array->metadata[i].key);
+ grpc_slice_unref(array->metadata[i].value);
+ }
+ }
+ grpc_metadata_array_destroy(array);
+}
+
/* Wraps a grpc_call struct in a PHP object. Owned indicates whether the
struct should be destroyed at the end of the object's lifecycle */
zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC) {
@@ -502,8 +517,8 @@ PHP_METHOD(Call, startBatch) {
}
cleanup:
- grpc_metadata_array_destroy(&metadata);
- grpc_metadata_array_destroy(&trailing_metadata);
+ grpc_php_metadata_array_destroy_including_entries(&metadata);
+ grpc_php_metadata_array_destroy_including_entries(&trailing_metadata);
grpc_metadata_array_destroy(&recv_metadata);
grpc_metadata_array_destroy(&recv_trailing_metadata);
grpc_slice_unref(recv_status_details);
@@ -526,7 +541,9 @@ cleanup:
*/
PHP_METHOD(Call, getPeer) {
wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis());
- PHP_GRPC_RETURN_STRING(grpc_call_get_peer(call->wrapped), 1);
+ char *peer = grpc_call_get_peer(call->wrapped);
+ PHP_GRPC_RETVAL_STRING(peer, 1);
+ gpr_free(peer);
}
/**
diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h
index 5bde5d5390..104ac301c1 100644
--- a/src/php/ext/grpc/call.h
+++ b/src/php/ext/grpc/call.h
@@ -69,5 +69,6 @@ void grpc_init_call(TSRMLS_D);
/* Populates a grpc_metadata_array with the data in a PHP array object.
Returns true on success and false on failure */
bool create_metadata_array(zval *array, grpc_metadata_array *metadata);
-
+void grpc_php_metadata_array_destroy_including_entries(
+ grpc_metadata_array* array);
#endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */
diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c
index a395d53614..41c488a79c 100644
--- a/src/php/ext/grpc/call_credentials.c
+++ b/src/php/ext/grpc/call_credentials.c
@@ -120,6 +120,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) {
fci->params, fci->param_count) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"createFromPlugin expects 1 callback", 1 TSRMLS_CC);
+ free(fci);
+ free(fci_cache);
return;
}
@@ -183,15 +185,17 @@ int plugin_get_metadata(
*status = GRPC_STATUS_OK;
*error_details = NULL;
+ bool should_return = false;
grpc_metadata_array metadata;
if (retval == NULL || Z_TYPE_P(retval) != IS_ARRAY) {
*status = GRPC_STATUS_INVALID_ARGUMENT;
- return true; // Synchronous return.
+ should_return = true; // Synchronous return.
}
if (!create_metadata_array(retval, &metadata)) {
*status = GRPC_STATUS_INVALID_ARGUMENT;
- return true; // Synchronous return.
+ should_return = true; // Synchronous return.
+ grpc_php_metadata_array_destroy_including_entries(&metadata);
}
if (retval != NULL) {
@@ -204,6 +208,9 @@ int plugin_get_metadata(
PHP_GRPC_FREE_STD_ZVAL(retval);
#endif
}
+ if (should_return) {
+ return true;
+ }
if (metadata.count > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) {
*status = GRPC_STATUS_INTERNAL;
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index db59869c7f..4054723b43 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -41,6 +41,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
#include "completion_queue.h"
#include "channel_credentials.h"
@@ -56,22 +57,63 @@ int le_plink;
/* Frees and destroys an instance of wrapped_grpc_channel */
PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel)
+ bool is_last_wrapper = false;
+ // In_persistent_list is used when the user don't close the channel.
+ // In this case, le in the list won't be freed.
+ bool in_persistent_list = true;
if (p->wrapper != NULL) {
gpr_mu_lock(&p->wrapper->mu);
if (p->wrapper->wrapped != NULL) {
- php_grpc_zend_resource *rsrc;
- php_grpc_int key_len = strlen(p->wrapper->key);
- // only destroy the channel here if not found in the persistent list
- gpr_mu_lock(&global_persistent_list_mu);
- if (!(PHP_GRPC_PERSISTENT_LIST_FIND(&EG(persistent_list), p->wrapper->key,
- key_len, rsrc))) {
- grpc_channel_destroy(p->wrapper->wrapped);
- free(p->wrapper->target);
- free(p->wrapper->args_hashstr);
+ if (p->wrapper->is_valid) {
+ php_grpc_zend_resource *rsrc;
+ php_grpc_int key_len = strlen(p->wrapper->key);
+ // only destroy the channel here if not found in the persistent list
+ gpr_mu_lock(&global_persistent_list_mu);
+ if (!(PHP_GRPC_PERSISTENT_LIST_FIND(&EG(persistent_list), p->wrapper->key,
+ key_len, rsrc))) {
+ in_persistent_list = false;
+ grpc_channel_destroy(p->wrapper->wrapped);
+ free(p->wrapper->target);
+ free(p->wrapper->args_hashstr);
+ if(p->wrapper->creds_hashstr != NULL){
+ free(p->wrapper->creds_hashstr);
+ p->wrapper->creds_hashstr = NULL;
+ }
+ }
+ gpr_mu_unlock(&global_persistent_list_mu);
}
- gpr_mu_unlock(&global_persistent_list_mu);
+ }
+ p->wrapper->ref_count -= 1;
+ if (p->wrapper->ref_count == 0) {
+ is_last_wrapper = true;
}
gpr_mu_unlock(&p->wrapper->mu);
+ if (is_last_wrapper) {
+ if (in_persistent_list) {
+ // If ref_count==0 and the key still in the list, it means the user
+ // don't call channel->close().persistent list should free the
+ // allocation in such case, as well as related wrapped channel.
+ if (p->wrapper->wrapped != NULL) {
+ gpr_mu_lock(&p->wrapper->mu);
+ grpc_channel_destroy(p->wrapper->wrapped);
+ free(p->wrapper->target);
+ free(p->wrapper->args_hashstr);
+ if(p->wrapper->creds_hashstr != NULL){
+ free(p->wrapper->creds_hashstr);
+ p->wrapper->creds_hashstr = NULL;
+ }
+ p->wrapper->wrapped = NULL;
+ php_grpc_delete_persistent_list_entry(p->wrapper->key,
+ strlen(p->wrapper->key)
+ TSRMLS_CC);
+ gpr_mu_unlock(&p->wrapper->mu);
+ }
+ }
+ gpr_mu_destroy(&p->wrapper->mu);
+ free(p->wrapper->key);
+ free(p->wrapper);
+ }
+ p->wrapper = NULL;
}
PHP_GRPC_FREE_WRAPPED_FUNC_END()
@@ -276,9 +318,16 @@ PHP_METHOD(Channel, __construct) {
channel->wrapper->key = key;
channel->wrapper->target = strdup(target);
channel->wrapper->args_hashstr = strdup(sha1str);
+ channel->wrapper->creds_hashstr = NULL;
+ channel->wrapper->ref_count = 1;
+ channel->wrapper->is_valid = true;
if (creds != NULL && creds->hashstr != NULL) {
- channel->wrapper->creds_hashstr = creds->hashstr;
+ php_grpc_int creds_hashstr_len = strlen(creds->hashstr);
+ char *channel_creds_hashstr = malloc(creds_hashstr_len + 1);
+ strcpy(channel_creds_hashstr, creds->hashstr);
+ channel->wrapper->creds_hashstr = channel_creds_hashstr;
}
+
gpr_mu_init(&channel->wrapper->mu);
smart_str_free(&buf);
@@ -303,7 +352,17 @@ PHP_METHOD(Channel, __construct) {
channel, target, args, creds, key, key_len TSRMLS_CC);
} else {
efree(args.args);
+ if (channel->wrapper->creds_hashstr != NULL){
+ free(channel->wrapper->creds_hashstr);
+ channel->wrapper->creds_hashstr = NULL;
+ }
+ free(channel->wrapper->creds_hashstr);
+ free(channel->wrapper->key);
+ free(channel->wrapper->target);
+ free(channel->wrapper->args_hashstr);
+ free(channel->wrapper);
channel->wrapper = le->channel;
+ channel->wrapper->ref_count += 1;
}
}
}
@@ -323,7 +382,8 @@ PHP_METHOD(Channel, getTarget) {
}
char *target = grpc_channel_get_target(channel->wrapper->wrapped);
gpr_mu_unlock(&channel->wrapper->mu);
- PHP_GRPC_RETURN_STRING(target, 1);
+ PHP_GRPC_RETVAL_STRING(target, 1);
+ gpr_free(target);
}
/**
@@ -411,18 +471,46 @@ PHP_METHOD(Channel, watchConnectivityState) {
*/
PHP_METHOD(Channel, close) {
wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis());
- gpr_mu_lock(&channel->wrapper->mu);
- if (channel->wrapper->wrapped != NULL) {
- grpc_channel_destroy(channel->wrapper->wrapped);
- free(channel->wrapper->target);
- free(channel->wrapper->args_hashstr);
- channel->wrapper->wrapped = NULL;
-
- php_grpc_delete_persistent_list_entry(channel->wrapper->key,
- strlen(channel->wrapper->key)
- TSRMLS_CC);
+ bool is_last_wrapper = false;
+ if (channel->wrapper != NULL) {
+ // Channel_wrapper hasn't call close before.
+ gpr_mu_lock(&channel->wrapper->mu);
+ if (channel->wrapper->wrapped != NULL) {
+ if (channel->wrapper->is_valid) {
+ // Wrapped channel hasn't been destoryed by other wrapper.
+ grpc_channel_destroy(channel->wrapper->wrapped);
+ free(channel->wrapper->target);
+ free(channel->wrapper->args_hashstr);
+ if(channel->wrapper->creds_hashstr != NULL){
+ free(channel->wrapper->creds_hashstr);
+ channel->wrapper->creds_hashstr = NULL;
+ }
+ channel->wrapper->wrapped = NULL;
+ channel->wrapper->is_valid = false;
+
+ php_grpc_delete_persistent_list_entry(channel->wrapper->key,
+ strlen(channel->wrapper->key)
+ TSRMLS_CC);
+ }
+ }
+ channel->wrapper->ref_count -= 1;
+ if(channel->wrapper->ref_count == 0){
+ // Mark that the wrapper can be freed because mu should be
+ // destroyed outside the lock.
+ is_last_wrapper = true;
+ }
+ gpr_mu_unlock(&channel->wrapper->mu);
}
- gpr_mu_unlock(&channel->wrapper->mu);
+ gpr_mu_lock(&global_persistent_list_mu);
+ if (is_last_wrapper) {
+ gpr_mu_destroy(&channel->wrapper->mu);
+ free(channel->wrapper->key);
+ free(channel->wrapper);
+ }
+ // Set channel->wrapper to NULL to avoid call close twice for the same
+ // channel.
+ channel->wrapper = NULL;
+ gpr_mu_unlock(&global_persistent_list_mu);
}
// Delete an entry from the persistent list
@@ -437,6 +525,7 @@ void php_grpc_delete_persistent_list_entry(char *key, php_grpc_int key_len
le = (channel_persistent_le_t *)rsrc->ptr;
le->channel = NULL;
php_grpc_zend_hash_del(&EG(persistent_list), key, key_len+1);
+ free(le);
}
gpr_mu_unlock(&global_persistent_list_mu);
}
diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h
index 69adc4782c..86bfdea51a 100755
--- a/src/php/ext/grpc/channel.h
+++ b/src/php/ext/grpc/channel.h
@@ -40,6 +40,11 @@ typedef struct _grpc_channel_wrapper {
char *args_hashstr;
char *creds_hashstr;
gpr_mu mu;
+ // is_valid is used to check the wrapped channel has been freed
+ // before to avoid double free.
+ bool is_valid;
+ // ref_count is used to let the last wrapper free related channel and key.
+ size_t ref_count;
} grpc_channel_wrapper;
/* Wrapper struct for grpc_channel that can be associated with a PHP object */
diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c
index d120d6e90f..624d7cc75c 100644
--- a/src/php/ext/grpc/channel_credentials.c
+++ b/src/php/ext/grpc/channel_credentials.c
@@ -57,8 +57,13 @@ static grpc_ssl_roots_override_result get_ssl_roots_override(
/* Frees and destroys an instance of wrapped_grpc_channel_credentials */
PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel_credentials)
+ if (p->hashstr != NULL) {
+ free(p->hashstr);
+ p->hashstr = NULL;
+ }
if (p->wrapped != NULL) {
grpc_channel_credentials_release(p->wrapped);
+ p->wrapped = NULL;
}
PHP_GRPC_FREE_WRAPPED_FUNC_END()
@@ -152,7 +157,7 @@ PHP_METHOD(ChannelCredentials, createSsl) {
}
php_grpc_int hashkey_len = root_certs_length + cert_chain_length;
- char *hashkey = emalloc(hashkey_len);
+ char *hashkey = emalloc(hashkey_len + 1);
if (root_certs_length > 0) {
strcpy(hashkey, pem_root_certs);
}
@@ -199,8 +204,13 @@ PHP_METHOD(ChannelCredentials, createComposite) {
grpc_channel_credentials *creds =
grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped,
NULL);
+ // wrapped_grpc_channel_credentials object should keeps it's own
+ // allocation. Otherwise it conflicts free hashstr with call.c.
+ php_grpc_int cred1_len = strlen(cred1->hashstr);
+ char *cred1_hashstr = malloc(cred1_len+1);
+ strcpy(cred1_hashstr, cred1->hashstr);
zval *creds_object =
- grpc_php_wrap_channel_credentials(creds, cred1->hashstr, true
+ grpc_php_wrap_channel_credentials(creds, cred1_hashstr, true
TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
diff --git a/src/php/ext/grpc/php7_wrapper.h b/src/php/ext/grpc/php7_wrapper.h
index 96091f9dad..2f4a53611c 100644
--- a/src/php/ext/grpc/php7_wrapper.h
+++ b/src/php/ext/grpc/php7_wrapper.h
@@ -33,6 +33,7 @@
#define php_grpc_add_next_index_stringl(data, str, len, b) \
add_next_index_stringl(data, str, len, b)
+#define PHP_GRPC_RETVAL_STRING(val, dup) RETVAL_STRING(val, dup)
#define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val, dup)
#define PHP_GRPC_MAKE_STD_ZVAL(pzv) MAKE_STD_ZVAL(pzv)
#define PHP_GRPC_FREE_STD_ZVAL(pzv)
@@ -145,6 +146,7 @@ static inline int php_grpc_zend_hash_find(HashTable *ht, char *key, int len,
#define php_grpc_add_next_index_stringl(data, str, len, b) \
add_next_index_stringl(data, str, len)
+#define PHP_GRPC_RETVAL_STRING(val, dup) RETVAL_STRING(val)
#define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val)
#define PHP_GRPC_MAKE_STD_ZVAL(pzv) \
pzv = (zval *)emalloc(sizeof(zval));
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index 0f2c5b8114..5971babc00 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -253,7 +253,8 @@ PHP_MSHUTDOWN_FUNCTION(grpc) {
*/
PHP_MINFO_FUNCTION(grpc) {
php_info_print_table_start();
- php_info_print_table_header(2, "grpc support", "enabled");
+ php_info_print_table_row(2, "grpc support", "enabled");
+ php_info_print_table_row(2, "grpc module version", PHP_GRPC_VERSION);
php_info_print_table_end();
/* Remove comments if you have entries in php.ini
diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h
index 48131d72d1..408f2a4765 100644
--- a/src/php/ext/grpc/version.h
+++ b/src/php/ext/grpc/version.h
@@ -20,6 +20,6 @@
#ifndef VERSION_H
#define VERSION_H
-#define PHP_GRPC_VERSION "1.9.0dev"
+#define PHP_GRPC_VERSION "1.10.0dev"
#endif /* VERSION_H */
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 2ff2e4e8a2..57592662c4 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -108,6 +108,9 @@ message ClientConfig {
// Number of messages on a stream before it gets finished/restarted
int32 messages_per_stream = 18;
+
+ // Use coalescing API when possible.
+ bool use_coalesce_api = 19;
}
message ClientStatus { ClientStats stats = 1; }
diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py
index 993c49d4af..6032828c77 100644
--- a/src/python/grpcio/grpc/_grpcio_metadata.py
+++ b/src/python/grpcio/grpc/_grpcio_metadata.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
-__version__ = """1.9.0.dev0"""
+__version__ = """1.10.0.dev0"""
diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py
index 56a280624f..d029472c68 100644
--- a/src/python/grpcio/grpc/_interceptor.py
+++ b/src/python/grpcio/grpc/_interceptor.py
@@ -51,6 +51,30 @@ class _ClientCallDetails(
pass
+def _unwrap_client_call_details(call_details, default_details):
+ try:
+ method = call_details.method
+ except AttributeError:
+ method = default_details.method
+
+ try:
+ timeout = call_details.timeout
+ except AttributeError:
+ timeout = default_details.timeout
+
+ try:
+ metadata = call_details.metadata
+ except AttributeError:
+ metadata = default_details.metadata
+
+ try:
+ credentials = call_details.credentials
+ except AttributeError:
+ credentials = default_details.credentials
+
+ return method, timeout, metadata, credentials
+
+
class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call):
def __init__(self, exception, traceback):
@@ -126,15 +150,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def future(self, request, timeout=None, metadata=None, credentials=None):
- def continuation(client_call_details, request):
- return self._thunk(client_call_details.method).future(
- request,
- timeout=client_call_details.timeout,
- metadata=client_call_details.metadata,
- credentials=client_call_details.credentials)
-
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
+
+ def continuation(new_details, request):
+ new_method, new_timeout, new_metadata, new_credentials = (
+ _unwrap_client_call_details(new_details, client_call_details))
+ return self._thunk(new_method).future(
+ request,
+ timeout=new_timeout,
+ metadata=new_metadata,
+ credentials=new_credentials)
+
try:
return self._interceptor.intercept_unary_unary(
continuation, client_call_details, request)
@@ -150,16 +177,18 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ client_call_details = _ClientCallDetails(self._method, timeout,
+ metadata, credentials)
- def continuation(client_call_details, request):
- return self._thunk(client_call_details.method)(
+ def continuation(new_details, request):
+ new_method, new_timeout, new_metadata, new_credentials = (
+ _unwrap_client_call_details(new_details, client_call_details))
+ return self._thunk(new_method)(
request,
- timeout=client_call_details.timeout,
- metadata=client_call_details.metadata,
- credentials=client_call_details.credentials)
+ timeout=new_timeout,
+ metadata=new_metadata,
+ credentials=new_credentials)
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
try:
return self._interceptor.intercept_unary_stream(
continuation, client_call_details, request)
@@ -203,17 +232,18 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
timeout=None,
metadata=None,
credentials=None):
-
- def continuation(client_call_details, request_iterator):
- return self._thunk(client_call_details.method).future(
- request_iterator,
- timeout=client_call_details.timeout,
- metadata=client_call_details.metadata,
- credentials=client_call_details.credentials)
-
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
+ def continuation(new_details, request_iterator):
+ new_method, new_timeout, new_metadata, new_credentials = (
+ _unwrap_client_call_details(new_details, client_call_details))
+ return self._thunk(new_method).future(
+ request_iterator,
+ timeout=new_timeout,
+ metadata=new_metadata,
+ credentials=new_credentials)
+
try:
return self._interceptor.intercept_stream_unary(
continuation, client_call_details, request_iterator)
@@ -233,17 +263,18 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
timeout=None,
metadata=None,
credentials=None):
-
- def continuation(client_call_details, request_iterator):
- return self._thunk(client_call_details.method)(
- request_iterator,
- timeout=client_call_details.timeout,
- metadata=client_call_details.metadata,
- credentials=client_call_details.credentials)
-
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
+ def continuation(new_details, request_iterator):
+ new_method, new_timeout, new_metadata, new_credentials = (
+ _unwrap_client_call_details(new_details, client_call_details))
+ return self._thunk(new_method)(
+ request_iterator,
+ timeout=new_timeout,
+ metadata=new_metadata,
+ credentials=new_credentials)
+
try:
return self._interceptor.intercept_stream_stream(
continuation, client_call_details, request_iterator)
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 1fac57b03a..a654eb026a 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION = '1.9.0.dev0'
+VERSION = '1.10.0.dev0'
diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py
index 5b7e5859bc..d3185c6972 100644
--- a/src/python/grpcio_health_checking/grpc_version.py
+++ b/src/python/grpcio_health_checking/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
-VERSION = '1.9.0.dev0'
+VERSION = '1.10.0.dev0'
diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py
index 0ad9621154..7203d0d321 100644
--- a/src/python/grpcio_reflection/grpc_version.py
+++ b/src/python/grpcio_reflection/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
-VERSION = '1.9.0.dev0'
+VERSION = '1.10.0.dev0'
diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py
index 0eb5fbf94d..bf9e55e10e 100644
--- a/src/python/grpcio_testing/grpc_version.py
+++ b/src/python/grpcio_testing/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!!
-VERSION = '1.9.0.dev0'
+VERSION = '1.10.0.dev0'
diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py
index b1b4d7e0c2..2583e42016 100644
--- a/src/python/grpcio_tests/grpc_version.py
+++ b/src/python/grpcio_tests/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
-VERSION = '1.9.0.dev0'
+VERSION = '1.10.0.dev0'
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 555168c3bb..d701d2f571 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -583,7 +583,7 @@ extern gpr_free_type gpr_free_import;
typedef void*(*gpr_realloc_type)(void* p, size_t size);
extern gpr_realloc_type gpr_realloc_import;
#define gpr_realloc gpr_realloc_import
-typedef void*(*gpr_malloc_aligned_type)(size_t size, size_t alignment_log);
+typedef void*(*gpr_malloc_aligned_type)(size_t size, size_t alignment);
extern gpr_malloc_aligned_type gpr_malloc_aligned_import;
#define gpr_malloc_aligned gpr_malloc_aligned_import
typedef void(*gpr_free_aligned_type)(void* ptr);
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index be1412511a..9d9f2f4968 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -14,5 +14,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '1.9.0.dev'
+ VERSION = '1.10.0.dev'
end
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
index 48aad39e08..2682294bd2 100644
--- a/src/ruby/tools/version.rb
+++ b/src/ruby/tools/version.rb
@@ -14,6 +14,6 @@
module GRPC
module Tools
- VERSION = '1.9.0.dev'
+ VERSION = '1.10.0.dev'
end
end