aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc353
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc5
-rw-r--r--src/core/lib/channel/channel_args.cc28
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/channel/handshaker.cc47
-rw-r--r--src/core/lib/channel/handshaker.h4
-rw-r--r--src/core/lib/security/transport/security_handshaker.cc4
-rw-r--r--src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include8
-rwxr-xr-xsrc/csharp/Grpc.Core/Version.csproj.include2
-rw-r--r--src/csharp/global.json5
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m5
-rw-r--r--src/python/grpcio_health_checking/setup.py2
-rw-r--r--src/python/grpcio_reflection/setup.py2
-rw-r--r--src/python/grpcio_testing/setup.py2
-rw-r--r--src/python/grpcio_tests/setup.py2
-rw-r--r--src/python/grpcio_tests/tests/_loader.py2
-rw-r--r--src/python/grpcio_tests/tests/_result.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py4
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py4
-rw-r--r--src/python/grpcio_tests/tests/stress/test_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_application.py24
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_application.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_compression_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/test_utilities.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_exit_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py2
30 files changed, 337 insertions, 200 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index a10bfea8b1..33cf56519e 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -842,10 +842,11 @@ typedef struct {
bool completed_recv_trailing_metadata : 1;
// State for callback processing.
bool retry_dispatched : 1;
- bool recv_initial_metadata_ready_deferred : 1;
- bool recv_message_ready_deferred : 1;
+ subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
grpc_error* recv_initial_metadata_error;
+ subchannel_batch_data* recv_message_ready_deferred_batch;
grpc_error* recv_message_error;
+ subchannel_batch_data* recv_trailing_metadata_internal_batch;
} subchannel_call_retry_state;
// Pending batches stored in call data.
@@ -994,6 +995,39 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
}
+// Frees cached send_initial_metadata.
+static void free_cached_send_initial_metadata(channel_data* chand,
+ call_data* calld) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
+ calld);
+ }
+ grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+}
+
+// Frees cached send_message at index idx.
+static void free_cached_send_message(channel_data* chand, call_data* calld,
+ size_t idx) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
+ chand, calld, idx);
+ }
+ (*calld->send_messages)[idx]->Destroy();
+}
+
+// Frees cached send_trailing_metadata.
+static void free_cached_send_trailing_metadata(channel_data* chand,
+ call_data* calld) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: destroying calld->send_trailing_metadata",
+ chand, calld);
+ }
+ grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+}
+
// Frees cached send ops that have already been completed after
// committing the call.
static void free_cached_send_op_data_after_commit(
@@ -1001,19 +1035,13 @@ static void free_cached_send_op_data_after_commit(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (retry_state->completed_send_initial_metadata) {
- grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+ free_cached_send_initial_metadata(chand, calld);
}
for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
- "]",
- chand, calld, i);
- }
- (*calld->send_messages)[i]->Destroy();
+ free_cached_send_message(chand, calld, i);
}
if (retry_state->completed_send_trailing_metadata) {
- grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+ free_cached_send_trailing_metadata(chand, calld);
}
}
@@ -1025,20 +1053,14 @@ static void free_cached_send_op_data_for_completed_batch(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (batch_data->batch.send_initial_metadata) {
- grpc_metadata_batch_destroy(&calld->send_initial_metadata);
+ free_cached_send_initial_metadata(chand, calld);
}
if (batch_data->batch.send_message) {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR
- "]",
- chand, calld, retry_state->completed_send_message_count - 1);
- }
- (*calld->send_messages)[retry_state->completed_send_message_count - 1]
- ->Destroy();
+ free_cached_send_message(chand, calld,
+ retry_state->completed_send_message_count - 1);
}
if (batch_data->batch.send_trailing_metadata) {
- grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
+ free_cached_send_trailing_metadata(chand, calld);
}
}
@@ -1642,7 +1664,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
"(Trailers-Only)",
chand, calld);
}
- retry_state->recv_initial_metadata_ready_deferred = true;
+ retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
@@ -1731,7 +1753,7 @@ static void recv_message_ready(void* arg, grpc_error* error) {
"message and recv_trailing_metadata pending)",
chand, calld);
}
- retry_state->recv_message_ready_deferred = true;
+ retry_state->recv_message_ready_deferred_batch = batch_data;
retry_state->recv_message_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
@@ -1750,6 +1772,59 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
//
+// list of closures to execute in call combiner
+//
+
+// Represents a closure that needs to run in the call combiner as part of
+// starting or completing a batch.
+typedef struct {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+ bool free_reason = false;
+} closure_to_execute;
+
+static void execute_closures_in_call_combiner(grpc_call_element* elem,
+ const char* caller,
+ closure_to_execute* closures,
+ size_t num_closures) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Note that the call combiner will be yielded for each closure that
+ // we schedule. We're already running in the call combiner, so one of
+ // the closures can be scheduled directly, but the others will
+ // have to re-enter the call combiner.
+ if (num_closures > 0) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: %s starting closure: %s", chand,
+ calld, caller, closures[0].reason);
+ }
+ GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
+ if (closures[0].free_reason) {
+ gpr_free(const_cast<char*>(closures[0].reason));
+ }
+ for (size_t i = 1; i < num_closures; ++i) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: %s starting closure in call combiner: %s",
+ chand, calld, caller, closures[i].reason);
+ }
+ GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
+ closures[i].error, closures[i].reason);
+ if (closures[i].free_reason) {
+ gpr_free(const_cast<char*>(closures[i].reason));
+ }
+ }
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: no closures to run for %s", chand,
+ calld, caller);
+ }
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run");
+ }
+}
+
+//
// on_complete callback handling
//
@@ -1777,36 +1852,35 @@ static void update_retry_state_for_completed_batch(
}
}
-// Represents a closure that needs to run as a result of a completed batch.
-typedef struct {
- grpc_closure* closure;
- grpc_error* error;
- const char* reason;
-} closure_to_execute;
-
// Adds any necessary closures for deferred recv_initial_metadata and
// recv_message callbacks to closures, updating *num_closures as needed.
static void add_closures_for_deferred_recv_callbacks(
subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
closure_to_execute* closures, size_t* num_closures) {
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure =
- GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
- invoke_recv_initial_metadata_callback, batch_data,
- grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_initial_metadata_error;
- closure->reason = "resuming recv_initial_metadata_ready";
- }
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred) {
- closure_to_execute* closure = &closures[(*num_closures)++];
- closure->closure = GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
- invoke_recv_message_callback,
- batch_data, grpc_schedule_on_exec_ctx);
- closure->error = retry_state->recv_message_error;
- closure->reason = "resuming recv_message_ready";
+ if (batch_data->batch.recv_trailing_metadata) {
+ // Add closure for deferred recv_initial_metadata_ready.
+ if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_initial_metadata_ready,
+ invoke_recv_initial_metadata_callback,
+ retry_state->recv_initial_metadata_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_initial_metadata_error;
+ closure->reason = "resuming recv_initial_metadata_ready";
+ retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
+ }
+ // Add closure for deferred recv_message_ready.
+ if (retry_state->recv_message_ready_deferred_batch != nullptr) {
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = GRPC_CLOSURE_INIT(
+ &batch_data->recv_message_ready, invoke_recv_message_callback,
+ retry_state->recv_message_ready_deferred_batch,
+ grpc_schedule_on_exec_ctx);
+ closure->error = retry_state->recv_message_error;
+ closure->reason = "resuming recv_message_ready";
+ retry_state->recv_message_ready_deferred_batch = nullptr;
+ }
}
}
@@ -1951,6 +2025,8 @@ static void on_complete(void* arg, grpc_error* error) {
// If we have previously completed recv_trailing_metadata, then the
// call is finished.
bool call_finished = retry_state->completed_recv_trailing_metadata;
+ // Record whether we were already committed before receiving this callback.
+ const bool previously_committed = calld->retry_committed;
// Update bookkeeping in retry_state.
update_retry_state_for_completed_batch(batch_data, retry_state);
if (call_finished) {
@@ -1979,35 +2055,39 @@ static void on_complete(void* arg, grpc_error* error) {
if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
- } else if (retry_state->completed_recv_trailing_metadata) {
- call_finished = true;
- }
- if (call_finished && grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand,
- calld, grpc_status_code_to_string(status));
}
- // If the call is finished, check if we should retry.
- if (call_finished &&
- maybe_retry(elem, batch_data, status, server_pushback_md)) {
- // Unref batch_data for deferred recv_initial_metadata_ready or
- // recv_message_ready callbacks, if any.
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_initial_metadata_ready_deferred) {
- batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ // If the call just finished, check if we should retry.
+ if (call_finished) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: call finished, status=%s", chand,
+ calld, grpc_status_code_to_string(status));
}
- if (batch_data->batch.recv_trailing_metadata &&
- retry_state->recv_message_ready_deferred) {
+ if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
+ // Unref batch_data for deferred recv_initial_metadata_ready or
+ // recv_message_ready callbacks, if any.
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_initial_metadata_ready_deferred_batch !=
+ nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
+ }
+ if (batch_data->batch.recv_trailing_metadata &&
+ retry_state->recv_message_ready_deferred_batch != nullptr) {
+ batch_data_unref(batch_data);
+ GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ }
batch_data_unref(batch_data);
- GRPC_ERROR_UNREF(retry_state->recv_message_error);
+ return;
}
- batch_data_unref(batch_data);
- return;
+ // Not retrying, so commit the call.
+ retry_commit(elem, retry_state);
}
}
- // If the call is finished or retries are committed, free cached data for
- // send ops that we've just completed.
- if (call_finished || calld->retry_committed) {
+ // If we were already committed before receiving this callback, free
+ // cached data for send ops that we've just completed. (If the call has
+ // just now finished, the call to retry_commit() above will have freed all
+ // cached send ops, so we don't need to do it here.)
+ if (previously_committed) {
free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
}
// Call not being retried.
@@ -2042,20 +2122,8 @@ static void on_complete(void* arg, grpc_error* error) {
// Don't need batch_data anymore.
batch_data_unref(batch_data);
// Schedule all of the closures identified above.
- // Note that the call combiner will be yielded for each closure that
- // we schedule. We're already running in the call combiner, so one of
- // the closures can be scheduled directly, but the others will
- // have to re-enter the call combiner.
- if (num_closures > 0) {
- GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error);
- for (size_t i = 1; i < num_closures; ++i) {
- GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure,
- closures[i].error, closures[i].reason);
- }
- } else {
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "no closures to run for on_complete");
- }
+ execute_closures_in_call_combiner(elem, "on_complete", closures,
+ num_closures);
}
//
@@ -2072,6 +2140,31 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
grpc_subchannel_call_process_op(subchannel_call, batch);
}
+// Adds a closure to closures that will execute batch in the call combiner.
+static void add_closure_for_subchannel_batch(
+ call_data* calld, grpc_transport_stream_op_batch* batch,
+ closure_to_execute* closures, size_t* num_closures) {
+ batch->handler_private.extra_arg = calld->subchannel_call;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch->handler_private.closure;
+ closure->error = GRPC_ERROR_NONE;
+ // If the tracer is enabled, we log a more detailed message, which
+ // requires dynamic allocation. This will be freed in
+ // start_retriable_subchannel_batches().
+ if (grpc_client_channel_trace.enabled()) {
+ char* batch_str = grpc_transport_stream_op_batch_string(batch);
+ gpr_asprintf(const_cast<char**>(&closure->reason),
+ "starting batch in call combiner: %s", batch_str);
+ gpr_free(batch_str);
+ closure->free_reason = true;
+ } else {
+ closure->reason = "start_subchannel_batch";
+ }
+}
+
// Adds retriable send_initial_metadata op to batch_data.
static void add_retriable_send_initial_metadata_op(
call_data* calld, subchannel_call_retry_state* retry_state,
@@ -2227,8 +2320,12 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
- subchannel_batch_data* batch_data = batch_data_create(elem, 1);
+ // Create batch_data with 2 refs, since this batch will be unreffed twice:
+ // once when the subchannel batch returns, and again when we actually get
+ // a recv_trailing_metadata op from the surface.
+ subchannel_batch_data* batch_data = batch_data_create(elem, 2);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
+ retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
}
@@ -2299,7 +2396,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
// *num_batches as needed.
static void add_subchannel_batches_for_pending_batches(
grpc_call_element* elem, subchannel_call_retry_state* retry_state,
- grpc_transport_stream_op_batch** batches, size_t* num_batches) {
+ closure_to_execute* closures, size_t* num_closures) {
call_data* calld = static_cast<call_data*>(elem->call_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
pending_batch* pending = &calld->pending_batches[i];
@@ -2342,13 +2439,37 @@ static void add_subchannel_batches_for_pending_batches(
}
if (batch->recv_trailing_metadata &&
retry_state->started_recv_trailing_metadata) {
+ // If we previously completed a recv_trailing_metadata op
+ // initiated by start_internal_recv_trailing_metadata(), use the
+ // result of that instead of trying to re-start this op.
+ if (retry_state->recv_trailing_metadata_internal_batch != nullptr) {
+ // If the batch completed, then trigger the completion callback
+ // directly, so that we return the previously returned results to
+ // the application. Otherwise, just unref the internally
+ // started subchannel batch, since we'll propagate the
+ // completion when it completes.
+ if (retry_state->completed_recv_trailing_metadata) {
+ subchannel_batch_data* batch_data =
+ retry_state->recv_trailing_metadata_internal_batch;
+ closure_to_execute* closure = &closures[(*num_closures)++];
+ closure->closure = &batch_data->on_complete;
+ // Batches containing recv_trailing_metadata always succeed.
+ closure->error = GRPC_ERROR_NONE;
+ closure->reason =
+ "re-executing on_complete for recv_trailing_metadata "
+ "to propagate internally triggered result";
+ } else {
+ batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
+ }
+ retry_state->recv_trailing_metadata_internal_batch = nullptr;
+ }
continue;
}
// If we're not retrying, just send the batch as-is.
if (calld->method_params == nullptr ||
calld->method_params->retry_policy() == nullptr ||
calld->retry_committed) {
- batches[(*num_batches)++] = batch;
+ add_closure_for_subchannel_batch(calld, batch, closures, num_closures);
pending_batch_clear(calld, pending);
continue;
}
@@ -2385,7 +2506,8 @@ static void add_subchannel_batches_for_pending_batches(
GPR_ASSERT(batch->collect_stats);
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
}
- batches[(*num_batches)++] = &batch_data->batch;
+ add_closure_for_subchannel_batch(calld, &batch_data->batch, closures,
+ num_closures);
}
}
@@ -2403,62 +2525,29 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
+ // Construct list of closures to execute, one for each pending batch.
// We can start up to 6 batches.
- grpc_transport_stream_op_batch*
- batches[GPR_ARRAY_SIZE(calld->pending_batches)];
- size_t num_batches = 0;
+ closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)];
+ size_t num_closures = 0;
// Replay previously-returned send_* ops if needed.
subchannel_batch_data* replay_batch_data =
maybe_create_subchannel_batch_for_replay(elem, retry_state);
if (replay_batch_data != nullptr) {
- batches[num_batches++] = &replay_batch_data->batch;
+ add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures,
+ &num_closures);
}
// Now add pending batches.
- add_subchannel_batches_for_pending_batches(elem, retry_state, batches,
- &num_batches);
+ add_subchannel_batches_for_pending_batches(elem, retry_state, closures,
+ &num_closures);
// Start batches on subchannel call.
- // Note that the call combiner will be yielded for each batch that we
- // send down. We're already running in the call combiner, so one of
- // the batches can be started directly, but the others will have to
- // re-enter the call combiner.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
- chand, calld, num_batches, calld->subchannel_call);
- }
- if (num_batches == 0) {
- // This should be fairly rare, but it can happen when (e.g.) an
- // attempt completes before it has finished replaying all
- // previously sent messages.
- GRPC_CALL_COMBINER_STOP(calld->call_combiner,
- "no retriable subchannel batches to start");
- } else {
- for (size_t i = 1; i < num_batches; ++i) {
- if (grpc_client_channel_trace.enabled()) {
- char* batch_str = grpc_transport_stream_op_batch_string(batches[i]);
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: starting batch in call combiner: %s", chand,
- calld, batch_str);
- gpr_free(batch_str);
- }
- batches[i]->handler_private.extra_arg = calld->subchannel_call;
- GRPC_CLOSURE_INIT(&batches[i]->handler_private.closure,
- start_batch_in_call_combiner, batches[i],
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(calld->call_combiner,
- &batches[i]->handler_private.closure,
- GRPC_ERROR_NONE, "start_subchannel_batch");
- }
- if (grpc_client_channel_trace.enabled()) {
- char* batch_str = grpc_transport_stream_op_batch_string(batches[0]);
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting batch: %s", chand, calld,
- batch_str);
- gpr_free(batch_str);
- }
- // Note: This will release the call combiner.
- grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]);
+ chand, calld, num_closures, calld->subchannel_call);
}
+ execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches",
+ closures, num_closures);
}
//
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index fb29fa788d..4e8b8b71db 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -326,7 +326,7 @@ static void http_connect_handshaker_do_handshake(
static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
- http_connect_handshaker_do_handshake};
+ http_connect_handshaker_do_handshake, "http_connect"};
static grpc_handshaker* grpc_http_connect_handshaker_create() {
http_connect_handshaker* handshaker =
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index fb2435749d..e86ab5a37e 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -153,7 +153,10 @@ static void grpc_ares_request_unref(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the
request */
if (gpr_unref(&r->pending_queries)) {
- grpc_cares_wrapper_address_sorting_sort(*(r->lb_addrs_out));
+ grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out);
+ if (lb_addrs != nullptr) {
+ grpc_cares_wrapper_address_sorting_sort(lb_addrs);
+ }
GRPC_CLOSURE_SCHED(r->on_done, r->error);
gpr_mu_destroy(&r->mu);
grpc_ares_ev_driver_destroy(r->ev_driver);
diff --git a/src/core/lib/channel/channel_args.cc b/src/core/lib/channel/channel_args.cc
index 66a86c2286..e49d532e11 100644
--- a/src/core/lib/channel/channel_args.cc
+++ b/src/core/lib/channel/channel_args.cc
@@ -411,3 +411,31 @@ grpc_arg grpc_channel_arg_pointer_create(
arg.value.pointer.vtable = vtable;
return arg;
}
+
+char* grpc_channel_args_string(const grpc_channel_args* args) {
+ if (args == nullptr) return nullptr;
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ for (size_t i = 0; i < args->num_args; ++i) {
+ const grpc_arg& arg = args->args[i];
+ char* s;
+ switch (arg.type) {
+ case GRPC_ARG_INTEGER:
+ gpr_asprintf(&s, "%s=%d", arg.key, arg.value.integer);
+ break;
+ case GRPC_ARG_STRING:
+ gpr_asprintf(&s, "%s=%s", arg.key, arg.value.string);
+ break;
+ case GRPC_ARG_POINTER:
+ gpr_asprintf(&s, "%s=%p", arg.key, arg.value.pointer.p);
+ break;
+ default:
+ gpr_asprintf(&s, "arg with unknown type");
+ }
+ gpr_strvec_add(&v, s);
+ }
+ char* result =
+ gpr_strjoin_sep(const_cast<const char**>(v.strs), v.count, ", ", nullptr);
+ gpr_strvec_destroy(&v);
+ return result;
+}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index c0d6a17356..5ff303a9dc 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -124,4 +124,8 @@ grpc_arg grpc_channel_arg_integer_create(char* name, int value);
grpc_arg grpc_channel_arg_pointer_create(char* name, void* value,
const grpc_arg_pointer_vtable* vtable);
+// Returns a string representing channel args in human-readable form.
+// Callers takes ownership of result.
+char* grpc_channel_args_string(const grpc_channel_args* args);
+
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc
index 9b1af8d6cb..9cd97823d4 100644
--- a/src/core/lib/channel/handshaker.cc
+++ b/src/core/lib/channel/handshaker.cc
@@ -22,11 +22,15 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/timer.h"
+grpc_core::TraceFlag grpc_handshaker_trace(false, "handshaker");
+
//
// grpc_handshaker
//
@@ -52,6 +56,10 @@ void grpc_handshaker_do_handshake(grpc_handshaker* handshaker,
args);
}
+const char* grpc_handshaker_name(grpc_handshaker* handshaker) {
+ return handshaker->vtable->name;
+}
+
//
// grpc_handshake_manager
//
@@ -127,6 +135,12 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
grpc_handshaker* handshaker) {
+ if (grpc_handshaker_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR,
+ mgr, grpc_handshaker_name(handshaker), handshaker, mgr->count);
+ }
gpr_mu_lock(&mgr->mu);
// To avoid allocating memory for each handshaker we add, we double
// the number of elements every time we need more.
@@ -172,23 +186,56 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr,
GRPC_ERROR_UNREF(why);
}
+static char* handshaker_args_string(grpc_handshaker_args* args) {
+ char* args_str = grpc_channel_args_string(args->args);
+ size_t num_args = args->args != nullptr ? args->args->num_args : 0;
+ size_t read_buffer_length =
+ args->read_buffer != nullptr ? args->read_buffer->length : 0;
+ char* str;
+ gpr_asprintf(&str,
+ "{endpoint=%p, args=%p {size=%" PRIuPTR
+ ": %s}, read_buffer=%p (length=%" PRIuPTR "), exit_early=%d}",
+ args->endpoint, args->args, num_args, args_str,
+ args->read_buffer, read_buffer_length, args->exit_early);
+ gpr_free(args_str);
+ return str;
+}
+
// Helper function to call either the next handshaker or the
// on_handshake_done callback.
// Returns true if we've scheduled the on_handshake_done callback.
static bool call_next_handshaker_locked(grpc_handshake_manager* mgr,
grpc_error* error) {
+ if (grpc_handshaker_trace.enabled()) {
+ char* args_str = handshaker_args_string(&mgr->args);
+ gpr_log(GPR_DEBUG,
+ "handshake_manager %p: error=%s shutdown=%d index=%" PRIuPTR
+ ", args=%s",
+ mgr, grpc_error_string(error), mgr->shutdown, mgr->index, args_str);
+ gpr_free(args_str);
+ }
GPR_ASSERT(mgr->index <= mgr->count);
// If we got an error or we've been shut down or we're exiting early or
// we've finished the last handshaker, invoke the on_handshake_done
// callback. Otherwise, call the next handshaker.
if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early ||
mgr->index == mgr->count) {
+ if (grpc_handshaker_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "handshake_manager %p: handshaking complete", mgr);
+ }
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(&mgr->deadline_timer);
GRPC_CLOSURE_SCHED(&mgr->on_handshake_done, error);
mgr->shutdown = true;
} else {
+ if (grpc_handshaker_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "handshake_manager %p: calling handshaker %s [%p] at index %" PRIuPTR,
+ mgr, grpc_handshaker_name(mgr->handshakers[mgr->index]),
+ mgr->handshakers[mgr->index], mgr->index);
+ }
grpc_handshaker_do_handshake(mgr->handshakers[mgr->index], mgr->acceptor,
&mgr->call_next_handshaker, &mgr->args);
}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index dfecd81004..be7fd127e4 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -84,6 +84,9 @@ typedef struct {
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args);
+
+ /// The name of the handshaker, for debugging purposes.
+ const char* name;
} grpc_handshaker_vtable;
/// Base struct. To subclass, make this the first member of the
@@ -102,6 +105,7 @@ void grpc_handshaker_do_handshake(grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args);
+const char* grpc_handshaker_name(grpc_handshaker* handshaker);
///
/// grpc_handshake_manager
diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc
index 0c97dfa6b3..57dd3406bc 100644
--- a/src/core/lib/security/transport/security_handshaker.cc
+++ b/src/core/lib/security/transport/security_handshaker.cc
@@ -406,7 +406,7 @@ static void security_handshaker_do_handshake(grpc_handshaker* handshaker,
static const grpc_handshaker_vtable security_handshaker_vtable = {
security_handshaker_destroy, security_handshaker_shutdown,
- security_handshaker_do_handshake};
+ security_handshaker_do_handshake, "security"};
static grpc_handshaker* security_handshaker_create(
tsi_handshaker* handshaker, grpc_security_connector* connector) {
@@ -456,7 +456,7 @@ static void fail_handshaker_do_handshake(grpc_handshaker* handshaker,
static const grpc_handshaker_vtable fail_handshaker_vtable = {
fail_handshaker_destroy, fail_handshaker_shutdown,
- fail_handshaker_do_handshake};
+ fail_handshaker_do_handshake, "security_fail"};
static grpc_handshaker* fail_handshaker_create() {
grpc_handshaker* h = static_cast<grpc_handshaker*>(gpr_malloc(sizeof(*h)));
diff --git a/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include b/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include
index f1b85c3730..309e33d47e 100644
--- a/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include
+++ b/src/csharp/Grpc.Core/NativeDeps.Mac.csproj.include
@@ -1,13 +1,5 @@
<Project>
<ItemGroup>
- <!-- We are relying on run_tests.py to build grpc_csharp_ext with the right bitness
- and we copy it as both x86 (needed by net45) and x64 (needed by netcoreapp1.0) as we don't
- know which one will be needed to run the tests. -->
- <Content Include="..\..\..\libs\$(NativeDependenciesConfigurationUnix)\libgrpc_csharp_ext.dylib">
- <Link>libgrpc_csharp_ext.x86.dylib</Link>
- <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
- <Pack>false</Pack>
- </Content>
<Content Include="..\..\..\libs\$(NativeDependenciesConfigurationUnix)\libgrpc_csharp_ext.dylib">
<Link>libgrpc_csharp_ext.x64.dylib</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include
index f7a7a5cbe9..6e28c11df2 100755
--- a/src/csharp/Grpc.Core/Version.csproj.include
+++ b/src/csharp/Grpc.Core/Version.csproj.include
@@ -2,6 +2,6 @@
<Project>
<PropertyGroup>
<GrpcCsharpVersion>1.12.0-dev</GrpcCsharpVersion>
- <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion>
+ <GoogleProtobufVersion>3.5.1</GoogleProtobufVersion>
</PropertyGroup>
</Project>
diff --git a/src/csharp/global.json b/src/csharp/global.json
deleted file mode 100644
index 815be4bfb9..0000000000
--- a/src/csharp/global.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
- "sdk": {
- "version": "2.1.4"
- }
-}
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index 283306262a..bfb1fd352c 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -108,7 +108,10 @@ static NSMutableDictionary *kHostCache;
serverName:(NSString *)serverName
timeout:(NSTimeInterval)timeout
completionQueue:(GRPCCompletionQueue *)queue {
- GRPCChannel *channel;
+ // The __block attribute is to allow channel take refcount inside @synchronized block. Without
+ // this attribute, retain of channel object happens after objc_sync_exit in release builds, which
+ // may result in channel released before used. See grpc/#15033.
+ __block GRPCChannel *channel;
// This is racing -[GRPCHost disconnect].
@synchronized(self) {
if (!_channel) {
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index 60d309ec65..35c09827ba 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py
index 10c4c38f19..589d0ff556 100644
--- a/src/python/grpcio_reflection/setup.py
+++ b/src/python/grpcio_reflection/setup.py
@@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py
index 5a9d593ec1..eb480a5464 100644
--- a/src/python/grpcio_testing/setup.py
+++ b/src/python/grpcio_testing/setup.py
@@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
- 'protobuf>=3.5.0.post1',
+ 'protobuf>=3.5.2.post1',
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index 4e0f6726fa..98ac19d188 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -41,7 +41,7 @@ INSTALL_REQUIRES = (
'grpcio>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
- 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10',
+ 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10',
'google-auth>=1.0.0', 'requests>=2.14.2')
COMMAND_CLASS = {
diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py
index 31680916b4..be0af64646 100644
--- a/src/python/grpcio_tests/tests/_loader.py
+++ b/src/python/grpcio_tests/tests/_loader.py
@@ -54,7 +54,7 @@ class Loader(object):
for module in modules:
try:
package_paths = module.__path__
- except:
+ except AttributeError:
continue
self.walk_packages(package_paths)
coverage_context.stop()
diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py
index 9907c4e1f9..b105f18e78 100644
--- a/src/python/grpcio_tests/tests/_result.py
+++ b/src/python/grpcio_tests/tests/_result.py
@@ -46,7 +46,7 @@ class CaseResult(
None.
"""
- class Kind:
+ class Kind(object):
UNTESTED = 'untested'
RUNNING = 'running'
ERROR = 'error'
@@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult):
#coverage.Coverage().combine()
-class _Colors:
+class _Colors(object):
"""Namespaced constants for terminal color magic numbers."""
HEADER = '\033[95m'
INFO = '\033[94m'
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 3780ed9020..698c37017f 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -66,10 +66,6 @@ def _args():
return parser.parse_args()
-def _application_default_credentials():
- return oauth2client_client.GoogleCredentials.get_application_default()
-
-
def _stub(args):
target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
index ad0ecf0079..b46e53315e 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
@@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase):
_packagify(self._python_out)
- with _system_path([
- self._python_out,
- ]):
+ with _system_path([self._python_out]):
self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2)
self._requests_pb2 = importlib.import_module(_REQUESTS_PB2)
self._responses_pb2 = importlib.import_module(_RESPONSES_PB2)
diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py
index d5038e3ba2..764cda17fb 100644
--- a/src/python/grpcio_tests/tests/stress/test_runner.py
+++ b/src/python/grpcio_tests/tests/stress/test_runner.py
@@ -50,7 +50,7 @@ class TestRunner(threading.Thread):
test_case.test_interoperability(self._stub, None)
end_time = time.time()
self._histogram.add((end_time - start_time) * 1e9)
- except Exception as e:
+ except Exception as e: # pylint: disable=broad-except
traceback.print_exc()
self._exception_queue.put(
Exception("An exception occured during test {}"
diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py
index 7d0d74c8c4..3ddeba2373 100644
--- a/src/python/grpcio_tests/tests/testing/_client_application.py
+++ b/src/python/grpcio_tests/tests/testing/_client_application.py
@@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub):
return _UNSATISFACTORY_OUTCOME
-def run(scenario, channel):
- stub = services_pb2_grpc.FirstServiceStub(channel)
- try:
- if scenario is Scenario.UNARY_UNARY:
- return _run_unary_unary(stub)
- elif scenario is Scenario.UNARY_STREAM:
- return _run_unary_stream(stub)
- elif scenario is Scenario.STREAM_UNARY:
- return _run_stream_unary(stub)
- elif scenario is Scenario.STREAM_STREAM:
- return _run_stream_stream(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
- return _run_concurrent_stream_unary(stub)
- elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
- return _run_concurrent_stream_stream(stub)
- elif scenario is Scenario.CANCEL_UNARY_UNARY:
- return _run_cancel_unary_unary(stub)
- elif scenario is Scenario.INFINITE_REQUEST_STREAM:
- return _run_infinite_request_stream(stub)
- except grpc.RpcError as rpc_error:
- return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
- rpc_error.details())
-
-
_IMPLEMENTATIONS = {
Scenario.UNARY_UNARY: _run_unary_unary,
Scenario.UNARY_STREAM: _run_unary_stream,
diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py
index 02769ca68d..243c385daf 100644
--- a/src/python/grpcio_tests/tests/testing/_server_application.py
+++ b/src/python/grpcio_tests/tests/testing/_server_application.py
@@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!')
return
- yield services_pb2.Strange()
+ yield services_pb2.Strange() # pylint: disable=unreachable
def StreUn(self, request_iterator, context):
context.send_initial_metadata(((
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index 7550cd39ba..0b11f03adf 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler):
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
- self.stream_stream = lambda x, y: handle_stream(x, y)
+ self.stream_stream = handle_stream
elif not self.request_streaming and not self.response_streaming:
- self.unary_unary = lambda x, y: handle_unary(x, y)
+ self.unary_unary = handle_unary
class _GenericHandler(grpc.GenericRpcHandler):
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 9045ff58a0..23f5ef605d 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -285,7 +285,7 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type(), found_server_op_types)
+ self.assertNotIn(server_result.type(), found_server_op_types)
found_server_op_types.add(server_result.type())
if server_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.message())
diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
index 4a00b9ef2f..7d5eaaaa84 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
@@ -25,7 +25,7 @@ class SimpleFuture(object):
def wrapped_function():
try:
self._result = function(*args, **kwargs)
- except Exception as error:
+ except Exception as error: # pylint: disable=broad-except
self._error = error
self._result = None
@@ -41,7 +41,7 @@ class SimpleFuture(object):
self._thread.join()
if self._error:
# TODO(atash): re-raise exceptions in a way that preserves tracebacks
- raise self._error
+ raise self._error # pylint: disable=raising-bad-type
return self._result
diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py
index 6e6d9de0fb..f40f3ae07c 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_test.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_test.py
@@ -49,7 +49,7 @@ def cleanup_processes():
for process in processes:
try:
process.kill()
- except Exception:
+ except Exception: # pylint: disable=broad-except
pass
diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
index e683131722..ad847ae03e 100644
--- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
+++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py
@@ -14,7 +14,7 @@
_BEFORE_IMPORT = tuple(globals())
-from grpc import *
+from grpc import * # pylint: disable=wildcard-import
_AFTER_IMPORT = tuple(globals())
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index e40cca8b24..93a5fdf9ff 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object):
def __next__(self):
if self._current >= self._high:
- raise Exception("This is a deliberate failure in a unit test.")
+ raise test_control.Defect()
else:
self._current += 1
return self._bytestring
+ next = __next__
+
def _unary_unary_multi_callable(channel):
return channel.unary_unary(_UNARY_UNARY)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
index 61c03f64ba..b43c647fc9 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
@@ -65,7 +65,7 @@ class _Servicer(object):
self._serviced = True
self._condition.notify_all()
return
- yield
+ yield # pylint: disable=unreachable
def stream_unary(self, request_iterator, context):
for request in request_iterator: