aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc199
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c365
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c56
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c6
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c3
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c26
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c18
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c22
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c8
-rw-r--r--src/core/ext/filters/workarounds/workaround_utils.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c46
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h7
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c13
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c56
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c13
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c3
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c1
-rw-r--r--src/core/lib/iomgr/tcp_uv.c15
-rw-r--r--src/core/lib/iomgr/timer_generic.c68
-rw-r--r--src/core/lib/iomgr/timer_manager.c101
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c45
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c100
-rw-r--r--src/core/lib/security/transport/security_connector.c6
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c143
-rw-r--r--src/core/lib/support/arena.c4
-rw-r--r--src/core/lib/support/atm.c14
-rw-r--r--src/core/lib/support/avl.c2
-rw-r--r--src/core/lib/support/log_linux.c4
-rw-r--r--src/core/lib/support/mpscq.c25
-rw-r--r--src/core/lib/support/mpscq.h26
-rw-r--r--src/core/lib/support/stack_lockfree.c137
-rw-r--r--src/core/lib/support/stack_lockfree.h38
-rw-r--r--src/core/lib/surface/call.c4
-rw-r--r--src/core/lib/surface/completion_queue.c633
-rw-r--r--src/core/lib/surface/completion_queue.h3
-rw-r--r--src/core/lib/surface/server.c117
-rw-r--r--src/core/lib/transport/static_metadata.c3
-rw-r--r--src/core/tsi/fake_transport_security.c256
-rw-r--r--src/core/tsi/fake_transport_security.h2
-rw-r--r--src/core/tsi/ssl_transport_security.c10
-rw-r--r--src/cpp/README.md10
-rw-r--r--src/cpp/client/channel_cc.cc12
-rw-r--r--src/cpp/client/generic_stub.cc6
-rw-r--r--src/cpp/common/completion_queue_cc.cc2
-rw-r--r--src/cpp/server/health/default_health_check_service.cc9
-rw-r--r--src/cpp/server/health/default_health_check_service.h2
-rw-r--r--src/cpp/server/server_cc.cc69
-rw-r--r--src/cpp/server/server_context.cc14
-rw-r--r--src/csharp/README.md4
-rw-r--r--src/node/ext/call.cc5
-rw-r--r--src/node/src/client.js2
-rw-r--r--src/node/src/protobuf_js_6_common.js2
-rw-r--r--src/node/test/surface_test.js24
-rw-r--r--src/node/tools/package.json2
-rw-r--r--src/objective-c/!ProtoCompiler-gRPCPlugin.podspec2
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h4
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m112
-rw-r--r--src/objective-c/examples/RemoteTestClient/RemoteTest.podspec2
-rw-r--r--src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj4
-rw-r--r--src/objective-c/tests/RemoteTestClient/RemoteTest.podspec2
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m26
-rw-r--r--src/php/README.md4
-rw-r--r--src/proto/grpc/testing/control.proto1
-rw-r--r--src/python/grpcio/README.rst2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi1
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio_health_checking/setup.py2
-rw-r--r--src/python/grpcio_reflection/setup.py2
-rw-r--r--src/ruby/README.md2
72 files changed, 1847 insertions, 1091 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index b09bf99677..f8c6fda340 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file,
printer->Print(vars, "namespace grpc {\n");
printer->Print(vars, "class CompletionQueue;\n");
printer->Print(vars, "class Channel;\n");
- printer->Print(vars, "class RpcService;\n");
printer->Print(vars, "class ServerCompletionQueue;\n");
printer->Print(vars, "class ServerContext;\n");
printer->Print(vars, "} // namespace grpc\n\n");
@@ -187,19 +186,21 @@ void PrintHeaderClientMethodInterfaces(
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>"
+ "std::unique_ptr< ::grpc::ClientWriterInterface< "
+ "$Request$>>"
" $Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientWriterInterface< $Request$>>"
- "($Method$Raw(context, response));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientWriterInterface< $Request$>>"
+ "($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
+ "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< "
+ "$Request$>>"
" Async$Method$(::grpc::ClientContext* context, $Response$* "
"response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
@@ -213,19 +214,21 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>"
+ "std::unique_ptr< ::grpc::ClientReaderInterface< "
+ "$Response$>>"
" $Method$(::grpc::ClientContext* context, const $Request$& request)"
" {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientReaderInterface< $Response$>>"
- "($Method$Raw(context, request));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderInterface< $Response$>>"
+ "($Method$Raw(context, request));\n");
printer->Outdent();
printer->Print("}\n");
printer->Print(
*vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> "
+ "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< "
+ "$Response$>> "
"Async$Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
@@ -242,36 +245,37 @@ void PrintHeaderClientMethodInterfaces(
"$Request$, $Response$>> "
"$Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientReaderWriterInterface< $Request$, $Response$>>("
- "$Method$Raw(context));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderWriterInterface< "
+ "$Request$, $Response$>>("
+ "$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>("
- "Async$Method$Raw(context, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>>("
+ "Async$Method$Raw(context, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
}
} else {
if (method->NoStreaming()) {
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) = 0;\n");
+ printer->Print(*vars,
+ "virtual "
+ "::grpc::ClientAsyncResponseReaderInterface< "
+ "$Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) = 0;\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -286,7 +290,8 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw("
+ "virtual ::grpc::ClientReaderInterface< $Response$>* "
+ "$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) = 0;\n");
printer->Print(
*vars,
@@ -451,7 +456,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
- printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n");
+ printer->Print(*vars,
+ "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n");
}
void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
@@ -623,7 +629,7 @@ void PrintHeaderServerMethodStreamedUnary(
printer->Print(*vars,
"WithStreamedUnaryMethod_$Method$() {\n"
" ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::StreamedUnaryHandler< $Request$, "
+ " new ::grpc::internal::StreamedUnaryHandler< $Request$, "
"$Response$>(std::bind"
"(&WithStreamedUnaryMethod_$Method$<BaseClass>::"
"Streamed$Method$, this, std::placeholders::_1, "
@@ -671,15 +677,16 @@ void PrintHeaderServerMethodSplitStreaming(
"{}\n");
printer->Print(" public:\n");
printer->Indent();
- printer->Print(*vars,
- "WithSplitStreamingMethod_$Method$() {\n"
- " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::SplitServerStreamingHandler< $Request$, "
- "$Response$>(std::bind"
- "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
- "Streamed$Method$, this, std::placeholders::_1, "
- "std::placeholders::_2)));\n"
- "}\n");
+ printer->Print(
+ *vars,
+ "WithSplitStreamingMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
+ " new ::grpc::internal::SplitServerStreamingHandler< $Request$, "
+ "$Response$>(std::bind"
+ "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
+ "Streamed$Method$, this, std::placeholders::_1, "
+ "std::placeholders::_2)));\n"
+ "}\n");
printer->Print(*vars,
"~WithSplitStreamingMethod_$Method$() override {\n"
" BaseClassMustBeDerivedFromService(this);\n"
@@ -819,7 +826,8 @@ void PrintHeaderService(grpc_generator::Printer *printer,
" {\n public:\n");
printer->Indent();
printer->Print(
- "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
+ "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& "
+ "channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
@@ -1082,11 +1090,12 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::Status $ns$$Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
- printer->Print(*vars,
- " return ::grpc::BlockingUnaryCall(channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::BlockingUnaryCall(channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request, response);\n"
+ "}\n\n");
printer->Print(
*vars,
"::grpc::ClientAsyncResponseReader< $Response$>* "
@@ -1095,7 +1104,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars,
" return "
- "::grpc::ClientAsyncResponseReader< $Response$>::Create("
+ "::grpc::ClientAsyncResponseReader< $Response$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request);\n"
@@ -1105,19 +1115,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientWriter< $Request$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter< $Request$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::ClientWriter< $Request$>::internal::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, response);\n"
+ "}\n\n");
printer->Print(*vars,
"::grpc::ClientAsyncWriter< $Request$>* "
"$ns$$Service$::Stub::Async$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
- " return ::grpc::ClientAsyncWriter< $Request$>::Create("
+ " return ::grpc::ClientAsyncWriter< $Request$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, response, tag);\n"
@@ -1128,19 +1140,21 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReader< $Response$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientReader< $Response$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::ClientReader< $Response$>::internal::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request);\n"
+ "}\n\n");
printer->Print(*vars,
"::grpc::ClientAsyncReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
- " return ::grpc::ClientAsyncReader< $Response$>::Create("
+ " return ::grpc::ClientAsyncReader< $Response$>::"
+ "internal::Create("
"channel_.get(), cq, "
"rpcmethod_$Method$_, "
"context, request, tag);\n"
@@ -1151,8 +1165,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReaderWriter< "
- "$Request$, $Response$>("
+ " return ::grpc::ClientReaderWriter< "
+ "$Request$, $Response$>::internal::Create("
"channel_.get(), "
"rpcmethod_$Method$_, "
"context);\n"
@@ -1162,14 +1176,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(
- *vars,
- " return "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, tag);\n"
- "}\n\n");
+ printer->Print(*vars,
+ " return "
+ "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::"
+ "internal::Create("
+ "channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, tag);\n"
+ "}\n\n");
}
}
@@ -1279,7 +1293,7 @@ void PrintSourceService(grpc_generator::Printer *printer,
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::$StreamingType$, "
+ "::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
@@ -1302,38 +1316,38 @@ void PrintSourceService(grpc_generator::Printer *printer,
if (method->NoStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
+ " ::grpc::internal::RpcMethod::NORMAL_RPC,\n"
+ " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler< "
+ " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n"
+ " new ::grpc::internal::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler< "
+ " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n"
+ " new ::grpc::internal::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler< "
+ " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n"
+ " new ::grpc::internal::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
@@ -1501,7 +1515,8 @@ void PrintMockClientMethods(grpc_generator::Printer *printer,
printer->Print(
*vars,
"MOCK_METHOD3(Async$Method$Raw, "
- "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*"
+ "::grpc::ClientAsyncReaderWriterInterface<$Request$, "
+ "$Response$>*"
"(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, "
"void* tag));\n");
}
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index de516ab4c9..7add432589 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -52,6 +52,8 @@
/* Client channel implementation */
+grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false);
+
/*************************************************************************
* METHOD-CONFIG TABLE
*/
@@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(error));
}
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
+ grpc_connectivity_state_name(state));
+ }
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
}
@@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
+ w->lb_policy, grpc_connectivity_state_name(w->state));
+ }
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
@@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
-
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
@@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
-
w->chand = chand;
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
grpc_combiner_scheduler(chand->combiner));
@@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
&w->on_changed);
}
+static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
+ }
+ GPR_ASSERT(!chand->started_resolving);
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+}
+
typedef struct {
char *server_name;
grpc_server_retry_throttle_data *retry_throttle_data;
@@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
+ grpc_error_string(error));
+ }
// Extract the following fields from the resolver result, if non-NULL.
char *lb_policy_name = NULL;
+ bool lb_policy_name_changed = false;
grpc_lb_policy *new_lb_policy = NULL;
char *service_config_json = NULL;
grpc_server_retry_throttle_data *retry_throttle_data = NULL;
@@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
- const bool lb_policy_type_changed =
+ lb_policy_name_changed =
chand->info_lb_policy_name == NULL ||
strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ if (chand->lb_policy != NULL && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
@@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
+ "service_config=\"%s\"",
+ chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
+ service_config_json);
+ }
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
@@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
chand->resolver == NULL) {
if (chand->lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
+ chand->lb_policy);
+ }
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
chand->interested_parties);
@@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
+ }
if (chand->resolver != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
+ }
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error *state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
+ }
GRPC_ERROR_UNREF(state_error);
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
&state_error);
@@ -772,7 +817,9 @@ typedef struct client_channel_call_data {
gpr_atm subchannel_call_or_error;
gpr_arena *arena;
- bool pick_pending;
+ grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
+ grpc_closure lb_pick_closure;
+
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
@@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked(
}
static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld,
+ grpc_call_element *elem,
grpc_error *error) {
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ grpc_error_string(error));
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
@@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
}
static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld) {
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
if (calld->waiting_for_pick_batches_count == 0) return;
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem,
GRPC_ERROR_REF(coe.error));
return;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
+ " pending batches to subchannel_call=%p",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ coe.subchannel_call);
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
calld->waiting_for_pick_batches[i]);
@@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
+ chand, calld);
+ }
if (chand->retry_throttle_data != NULL) {
calld->retry_throttle_data =
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
@@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
}
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld, grpc_error *error) {
+ grpc_call_element *elem,
+ grpc_error *error) {
+ call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
@@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
+ elem->channel_data, calld, subchannel_call,
+ grpc_error_string(new_error));
+ }
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
} else {
- waiting_for_pick_batches_resume_locked(exec_ctx, calld);
+ waiting_for_pick_batches_resume_locked(exec_ctx, elem);
}
GRPC_ERROR_UNREF(error);
}
@@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(calld->pick_pending);
- calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
call_or_error coe = get_call_or_error(calld);
@@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
"Call dropped by load balancing policy")
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
+ calld, grpc_error_string(failure));
+ }
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
} else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
grpc_error *child_errors[] = {error, coe.error};
@@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_DEADLINE_EXCEEDED);
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: cancelled before subchannel became ready: %s",
+ chand, calld, grpc_error_string(cancellation_error));
+ }
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
} else {
/* Create call on subchannel. */
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
@@ -983,41 +1063,77 @@ typedef struct {
grpc_closure closure;
} pick_after_resolver_result_args;
-static void continue_picking_after_resolver_result_locked(
- grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
pick_after_resolver_result_args *args = arg;
if (args->cancelled) {
/* cancelled, do nothing */
- } else if (error != GRPC_ERROR_NONE) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "call cancelled before resolver result");
+ }
} else {
- if (pick_subchannel_locked(exec_ctx, args->elem)) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ channel_data *chand = args->elem->channel_data;
+ call_data *calld = args->elem->call_data;
+ if (error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
+ chand, calld);
+ }
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ } else {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
+ chand, calld);
+ }
+ if (pick_subchannel_locked(exec_ctx, args->elem)) {
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ }
}
}
gpr_free(args);
}
-static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_error *error) {
+static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: deferring pick pending resolver result", chand,
+ calld);
+ }
+ pick_after_resolver_result_args *args =
+ (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
+ args->elem = elem;
+ GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
+ args, grpc_combiner_scheduler(chand->combiner));
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &args->closure, GRPC_ERROR_NONE);
+}
+
+static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
- &calld->connected_subchannel,
- GRPC_ERROR_REF(error));
- }
// If we don't yet have a resolver result, then a closure for
- // continue_picking_after_resolver_result_locked() will have been added to
+ // pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
- // cancelled, so that when continue_picking_after_resolver_result_locked()
+ // cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
closure != NULL; closure = closure->next_data.next) {
pick_after_resolver_result_args *args = closure->cb_arg;
if (!args->cancelled && args->elem == elem) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: "
+ "cancelling pick waiting for resolver result",
+ chand, calld);
+ }
args->cancelled = true;
subchannel_ready_locked(exec_ctx, elem,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GRPC_ERROR_UNREF(error);
}
-// State for pick callback that holds a reference to the LB policy
-// from which the pick was requested.
-typedef struct {
- grpc_lb_policy *lb_policy;
- grpc_call_element *elem;
- grpc_closure closure;
-} pick_callback_args;
-
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- pick_callback_args *args = arg;
- GPR_ASSERT(args != NULL);
- GPR_ASSERT(args->lb_policy != NULL);
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
- GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
- gpr_free(args);
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
+ chand, calld);
+ }
+ GPR_ASSERT(calld->lb_policy != NULL);
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
+ subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
@@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
const grpc_lb_policy_pick_args *inputs) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
+ chand, calld, chand->lb_policy);
+ }
+ // Keep a ref to the LB policy in calld while the pick is pending.
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
- pick_args->lb_policy = chand->lb_policy;
- pick_args->elem = elem;
- GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
+ calld->lb_policy = chand->lb_policy;
+ GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
- calld->subchannel_call_context, NULL, &pick_args->closure);
+ calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
- gpr_free(pick_args);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
+ chand, calld);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
}
return pick_done;
}
+static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ GPR_ASSERT(calld->lb_policy != NULL);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ chand, calld, calld->lb_policy);
+ }
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
+ &calld->connected_subchannel, error);
+}
+
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
} else if (chand->resolver != NULL) {
if (!chand->started_resolving) {
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
- args->elem = elem;
- GRPC_CLOSURE_INIT(&args->closure,
- continue_picking_after_resolver_result_locked, args,
- grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &args->closure, GRPC_ERROR_NONE);
+ pick_after_resolver_result_start_locked(exec_ctx, elem);
} else {
subchannel_ready_locked(
exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
@@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op_batch *op = arg;
- grpc_call_element *elem = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
goto done;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
- waiting_for_pick_batches_add_locked(calld, op);
- /* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_stream) {
- grpc_error *error = op->payload->cancel_stream.cancel_error;
+ waiting_for_pick_batches_add_locked(calld, batch);
+ // If this is a cancellation, cancel the pending pick (if any) and
+ // fail any pending batches.
+ if (batch->cancel_stream) {
+ grpc_error *error = batch->payload->cancel_stream.cancel_error;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(error));
+ }
/* Stash a copy of cancel_error in our call data, so that we can use
it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
+ cancelled before any batches are passed down (e.g., if the deadline
is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
+ error to the caller when the first batch does get passed down. */
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
- if (calld->pick_pending) {
- cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ if (calld->lb_policy != NULL) {
+ pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ } else {
+ pick_after_resolver_result_cancel_locked(exec_ctx, elem,
+ GRPC_ERROR_REF(error));
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(error));
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
goto done;
}
/* if we don't have a subchannel, try to get one */
- if (!calld->pick_pending && calld->connected_subchannel == NULL &&
- op->send_initial_metadata) {
- calld->initial_metadata_payload = op->payload;
- calld->pick_pending = true;
+ if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->connected_subchannel == NULL);
+ calld->initial_metadata_payload = batch->payload;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel_locked(exec_ctx, elem)) {
// Pick was returned synchronously.
- calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
set_call_or_error(calld,
(call_or_error){.error = GRPC_ERROR_REF(error)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
} else {
// Create subchannel call.
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
@@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
+ GRPC_TRACER_ON(grpc_trace_channel)) {
+ grpc_call_log_op(GPR_INFO, elem, batch);
+ }
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
+ batch);
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
- if (op->recv_trailing_metadata) {
- GPR_ASSERT(op->on_complete != NULL);
- calld->original_on_complete = op->on_complete;
+ if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(batch->on_complete != NULL);
+ calld->original_on_complete = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
- op->on_complete = &calld->on_complete;
+ batch->on_complete = &calld->on_complete;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
+ goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
+ goto done;
}
/* we failed; lock and figure out what to do */
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
- op->handler_private.extra_arg = elem;
+ batch->handler_private.extra_arg = elem;
GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
+ exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_transport_stream_op_batch_locked, batch,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
+done:
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
"client_channel_destroy_call");
}
- GPR_ASSERT(!calld->pick_pending);
+ GPR_ASSERT(calld->lb_policy == NULL);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
@@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->started_resolving = true;
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 63f7c29940..c99f0092e9 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -23,6 +23,8 @@
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
+extern grpc_tracer_flag grpc_client_channel_trace;
+
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index 60e77d6268..6f133a648b 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -78,6 +78,7 @@ void grpc_client_channel_init(void) {
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
+ grpc_register_tracer("client_channel", &grpc_client_channel_trace);
#ifndef NDEBUG
grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount);
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 8e9d6b0f47..fbef79ec31 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -158,6 +158,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
}
gpr_free(subchannel_list->subchannels);
@@ -195,11 +196,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
+ if (subchannel_list->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
+ (void *)subchannel_list);
+ }
+ return;
+ };
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
+ (void *)subchannel_list);
+ }
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Unsubscribing from subchannel %p as part of shutting down "
+ "subchannel_list %p",
+ (void *)sd->subchannel, (void *)subchannel_list);
+ }
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
@@ -228,13 +246,14 @@ static size_t get_next_ready_subchannel_index_locked(
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
- "state=%d",
- (void *)p,
- (void *)p->subchannel_list->subchannels[index].subchannel,
- (void *)p->subchannel_list, (unsigned long)index,
- p->subchannel_list->subchannels[index].curr_connectivity_state);
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%s",
+ (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ grpc_connectivity_state_name(
+ p->subchannel_list->subchannels[index].curr_connectivity_state));
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
@@ -511,16 +530,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->subchannel_list->policy;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
+ "prev_state=%s new_state=%s p->shutdown=%d "
+ "sd->subchannel_list->shutting_down=%d error=%s",
+ (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list,
+ grpc_connectivity_state_name(sd->prev_connectivity_state),
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
+ p->shutdown, sd->subchannel_list->shutting_down,
+ grpc_error_string(error));
+ }
// If the policy is shutting down, unref and return.
if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return;
}
- if (sd->subchannel_list->shutting_down) {
+ if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription.
- GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
return;
@@ -536,13 +566,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] connectivity changed for subchannel %p: "
- "prev_state=%d new_state=%d",
- (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
- sd->curr_connectivity_state);
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
@@ -556,6 +579,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
index af3391a731..5ea75f0554 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
@@ -132,7 +132,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- dns_resolver *r = arg;
+ dns_resolver *r = (dns_resolver *)arg;
r->have_retry_timer = false;
if (error == GRPC_ERROR_NONE) {
@@ -146,7 +146,7 @@ static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- dns_resolver *r = arg;
+ dns_resolver *r = (dns_resolver *)arg;
grpc_channel_args *result = NULL;
GPR_ASSERT(r->resolving);
r->resolving = false;
@@ -241,7 +241,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx,
char *path = args->uri->path;
if (path[0] == '/') ++path;
// Create resolver.
- dns_resolver *r = gpr_zalloc(sizeof(dns_resolver));
+ dns_resolver *r = (dns_resolver *)gpr_zalloc(sizeof(dns_resolver));
grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner);
r->name_to_resolve = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index 7b4fe38272..7ceb8f40a1 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -177,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
return NULL;
}
/* Instantiate resolver. */
- sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver));
+ sockaddr_resolver *r =
+ (sockaddr_resolver *)gpr_zalloc(sizeof(sockaddr_resolver));
r->addresses = addresses;
r->channel_args = grpc_channel_args_copy(args->args);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner);
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index ced025e2e2..6789903c95 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -37,8 +37,8 @@
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- grpc_call_element* elem = arg;
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_signal_error(
exec_ctx, elem,
@@ -57,7 +57,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
return;
}
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
retry:
@@ -112,7 +112,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
// Callback run when the call is complete.
static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
- grpc_deadline_state* deadline_state = arg;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete,
@@ -145,7 +145,7 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
gpr_timespec deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
@@ -169,13 +169,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
}
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
start_timer_if_needed(exec_ctx, elem, new_deadline);
}
@@ -183,7 +183,7 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
@@ -256,8 +256,8 @@ static void client_start_transport_stream_op_batch(
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- grpc_call_element* elem = arg;
- server_call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ server_call_data* calld = (server_call_data*)elem->call_data;
// Get deadline from metadata and start the timer if needed.
start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
@@ -269,7 +269,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- server_call_data* calld = elem->call_data;
+ server_call_data* calld = (server_call_data*)elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else {
@@ -341,8 +341,8 @@ static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx,
void* arg) {
return grpc_deadline_checking_enabled(
grpc_channel_stack_builder_get_channel_arguments(builder))
- ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL,
- NULL)
+ ? grpc_channel_stack_builder_prepend_filter(
+ builder, (const grpc_channel_filter*)arg, NULL, NULL)
: true;
}
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 35304f8150..7d748b9c32 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -108,7 +108,7 @@ static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
/* Decrease call_count. If there are no active calls at this time,
max_idle_timer will start here. If the number of active calls is not 0,
max_idle_timer will start after all the active calls end. */
@@ -119,7 +119,7 @@ static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
@@ -140,7 +140,7 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
@@ -156,7 +156,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
if (error == GRPC_ERROR_NONE) {
/* Prevent the max idle timer from being set again */
gpr_atm_no_barrier_fetch_add(&chand->call_count, 1);
@@ -176,7 +176,7 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
@@ -200,7 +200,7 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
@@ -220,7 +220,7 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op* op = grpc_make_transport_op(NULL);
op->on_connectivity_state_change = &chand->channel_connectivity_changed,
@@ -264,7 +264,7 @@ static int add_random_max_connection_age_jitter(int value) {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
increase_call_count(exec_ctx, chand);
return GRPC_ERROR_NONE;
}
@@ -281,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
gpr_mu_init(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
chand->max_age_grace_timer_pending = false;
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
index 9bb565ed6d..846c7df69a 100644
--- a/src/core/ext/filters/message_size/message_size_filter.c
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -60,7 +60,8 @@ static void* message_size_limits_create_from_json(const grpc_json* json) {
if (max_response_message_bytes == -1) return NULL;
}
}
- message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
+ message_size_limits* value =
+ (message_size_limits*)gpr_malloc(sizeof(message_size_limits));
value->max_send_size = max_request_message_bytes;
value->max_recv_size = max_response_message_bytes;
return value;
@@ -88,8 +89,8 @@ typedef struct channel_data {
// receive message size.
static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_error* error) {
- grpc_call_element* elem = user_data;
- call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)user_data;
+ call_data* calld = (call_data*)elem->call_data;
if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 &&
(*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) {
char* message_string;
@@ -117,7 +118,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
static void start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
// Check max send message size.
if (op->send_message && calld->limits.max_send_size >= 0 &&
op->payload->send_message.send_message->length >
@@ -149,8 +150,8 @@ static void start_transport_stream_op_batch(
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
- call_data* calld = elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
calld->next_recv_message_ready = NULL;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -160,8 +161,9 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
// size to the receive limit.
calld->limits = chand->limits;
if (chand->method_limit_table != NULL) {
- message_size_limits* limits = grpc_method_config_table_get(
- exec_ctx, chand->method_limit_table, args->path);
+ message_size_limits* limits =
+ (message_size_limits*)grpc_method_config_table_get(
+ exec_ctx, chand->method_limit_table, args->path);
if (limits != NULL) {
if (limits->max_send_size >= 0 &&
(limits->max_send_size < calld->limits.max_send_size ||
@@ -220,7 +222,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
chand->limits = get_message_size_limits(args->channel_args);
// Get method config table from channel args.
const grpc_arg* channel_arg =
@@ -243,7 +245,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
// Destructor for channel_data.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table);
}
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
index 8b3fff5fa3..b4d2cb4b8c 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
@@ -52,8 +52,8 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,
// Callback invoked when we receive an initial metadata.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
void* user_data, grpc_error* error) {
- grpc_call_element* elem = user_data;
- call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)user_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_ERROR_NONE == error) {
grpc_mdelem md;
@@ -75,7 +75,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
static void start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
// Inject callback for receiving initial metadata
if (op->recv_initial_metadata) {
@@ -103,7 +103,7 @@ static void start_transport_stream_op_batch(
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
calld->next_recv_initial_metadata_ready = NULL;
calld->workaround_active = false;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c
index bc76753a8a..e600fbee67 100644
--- a/src/core/ext/filters/workarounds/workaround_utils.c
+++ b/src/core/ext/filters/workarounds/workaround_utils.c
@@ -33,7 +33,8 @@ grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) {
if (NULL != user_agent_md) {
return user_agent_md;
}
- user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md));
+ user_agent_md = (grpc_workaround_user_agent_md *)gpr_malloc(
+ sizeof(grpc_workaround_user_agent_md));
for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) {
if (ua_parser[i]) {
user_agent_md->workaround_active[i] = ua_parser[i](md);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 6e8eadf7a1..f790267944 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -34,6 +34,7 @@
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -280,8 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
- grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
@@ -399,6 +398,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -487,6 +488,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_permit_without_calls =
(uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){0, 0, 1});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_OPTIMIZATION_TARGET)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s should be a string",
+ GRPC_ARG_OPTIMIZATION_TARGET);
+ } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 ==
+ strcmp(channel_args->args[i].value.string, "throughput")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
+ } else {
+ gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
+ GRPC_ARG_OPTIMIZATION_TARGET,
+ channel_args->args[i].value.string);
+ }
} else {
static const struct {
const char *channel_arg_name;
@@ -540,6 +558,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
+ t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT
+ ? grpc_executor_scheduler
+ : grpc_schedule_on_exec_ctx);
+
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -1464,6 +1487,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+ if (!t->is_client) {
+ if (op->send_initial_metadata) {
+ gpr_timespec deadline =
+ op->payload->send_initial_metadata.send_initial_metadata->deadline;
+ GPR_ASSERT(0 ==
+ gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline));
+ }
+ if (op->send_trailing_metadata) {
+ gpr_timespec deadline =
+ op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
+ GPR_ASSERT(0 ==
+ gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline));
+ }
+ }
+
if (GRPC_TRACER_ON(grpc_http_trace)) {
char *str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str);
@@ -2129,8 +2167,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bw_dbl, double bdp_dbl) {
- int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
- int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp);
+ int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX);
+ int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp);
// frame size is bounded [2^14,2^24-1]
int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
int64_t delta = (int64_t)frame_size -
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9fa72ddbdf..a5b67e5aba 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -67,6 +67,11 @@ typedef enum {
} grpc_chttp2_ping_type;
typedef enum {
+ GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
+ GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
+} grpc_chttp2_optimization_target;
+
+typedef enum {
GRPC_CHTTP2_PCL_INITIATE = 0,
GRPC_CHTTP2_PCL_NEXT,
GRPC_CHTTP2_PCL_INFLIGHT,
@@ -229,6 +234,8 @@ struct grpc_chttp2_transport {
/** should we probe bdp? */
bool enable_bdp_probe;
+ grpc_chttp2_optimization_target opt_target;
+
/** various lists of streams */
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 2c91ad357c..9f82c480bc 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -57,9 +57,6 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
-/* Uncomment the following to enable extra checks on poll_object operations */
-/* #define PO_DEBUG */
-
/* The maximum number of polling threads per polling island. By default no
limit */
static int g_max_pollers_per_pi = INT_MAX;
@@ -92,7 +89,7 @@ typedef enum {
} poll_obj_type;
typedef struct poll_obj {
-#ifdef PO_DEBUG
+#ifndef NDEBUG
poll_obj_type obj_type;
#endif
gpr_mu mu;
@@ -893,7 +890,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
@@ -1171,7 +1168,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->po.mu);
*mu = &pollset->po.mu;
pollset->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pollset->po.obj_type = POLL_OBJ_POLLSET;
#endif
@@ -1625,7 +1622,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
-#ifdef PO_DEBUG
+#ifndef NDEBUG
GPR_ASSERT(item->obj_type == item_type);
GPR_ASSERT(bag->obj_type == bag_type);
#endif
@@ -1784,7 +1781,7 @@ static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
gpr_mu_init(&pss->po.mu);
pss->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pss->po.obj_type = POLL_OBJ_POLLSET_SET;
#endif
return pss;
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5574838187..5690431759 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -103,6 +103,32 @@ typedef struct pollable {
grpc_pollset_worker *root_worker;
} pollable;
+static const char *polling_obj_type_string(polling_obj_type t) {
+ switch (t) {
+ case PO_POLLING_GROUP:
+ return "polling_group";
+ case PO_POLLSET_SET:
+ return "pollset_set";
+ case PO_POLLSET:
+ return "pollset";
+ case PO_FD:
+ return "fd";
+ case PO_EMPTY_POLLABLE:
+ return "empty_pollable";
+ case PO_COUNT:
+ return "<invalid:count>";
+ }
+ return "<invalid>";
+}
+
+static char *pollable_desc(pollable *p) {
+ char *out;
+ gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
+ polling_obj_type_string(p->po.type), p->po.group, p->epfd,
+ p->wakeup.read_fd);
+ return out;
+}
+
static pollable g_empty_pollable;
static void pollable_init(pollable *p, polling_obj_type type);
@@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
GPR_ASSERT(epfd != -1);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p);
+ gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
gpr_mu_lock(&fd->orphaned_mu);
@@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
if (worker->pollable != &pollset->pollable) {
gpr_mu_lock(&worker->pollable->po.mu);
}
- if (worker->initialized_cv) {
+ if (worker->initialized_cv && worker != pollset->root_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
"pollset_shutdown");
}
@@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout);
+ char *desc = pollable_desc(p);
+ gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_free(desc);
}
if (timeout != 0) {
@@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->current_pollable == &g_empty_pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
+ }
/* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
@@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
REF_BY(fd, 2, "pollset_pollable");
} else if (pollset->current_pollable == &pollset->pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
+ }
append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
err_desc);
} else if (pollset->current_pollable != &fd->pollable) {
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
pollset, fd, had_fd);
+ }
+ /* Introduce a spurious completion.
+ If we do not, then it may be that the fd-specific epoll set consumed
+ a completion without being polled, leading to a missed edge going up. */
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 255e07010b..3c4ca9c7c5 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -54,9 +54,6 @@
gpr_log(GPR_INFO, __VA_ARGS__); \
}
-/* Uncomment the following to enable extra checks on poll_object operations */
-/* #define PO_DEBUG */
-
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
@@ -85,7 +82,7 @@ typedef enum {
} poll_obj_type;
typedef struct poll_obj {
-#ifdef PO_DEBUG
+#ifndef NDEBUG
poll_obj_type obj_type;
#endif
gpr_mu mu;
@@ -821,7 +818,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
@@ -1079,7 +1076,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->po.mu);
*mu = &pollset->po.mu;
pollset->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pollset->po.obj_type = POLL_OBJ_POLLSET;
#endif
@@ -1416,7 +1413,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
-#ifdef PO_DEBUG
+#ifndef NDEBUG
GPR_ASSERT(item->obj_type == item_type);
GPR_ASSERT(bag->obj_type == bag_type);
#endif
@@ -1575,7 +1572,7 @@ static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
gpr_mu_init(&pss->po.mu);
pss->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pss->po.obj_type = POLL_OBJ_POLLSET_SET;
#endif
return pss;
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 45de289e45..a98b8e62db 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r,
int retry_status;
uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
- r->port = svc[i][1];
+ r->port = gpr_strdup(svc[i][1]);
retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
r->host, r->port, r->hints);
if (retry_status < 0 || getaddrinfo_cb == NULL) {
@@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
+ gpr_free(r->host);
+ gpr_free(r->port);
gpr_free(r);
uv_freeaddrinfo(res);
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index ab6832932f..2f1d237c07 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect {
static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota);
+ gpr_free(connect->addr_name);
gpr_free(connect);
}
@@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
}
done = (--connect->refs == 0);
if (done) {
+ grpc_exec_ctx_flush(&exec_ctx);
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
GRPC_CLOSURE_SCHED(&exec_ctx, closure, error);
@@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
+ connect->refs = 1;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 2de0ea90e7..2ab836cc34 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) {
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
acceptor);
grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(peer_name_string);
}
}
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 7c21b44e76..ff5fd3edc8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -65,7 +65,10 @@ typedef struct {
} grpc_tcp;
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ gpr_free(tcp->handle);
+ gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -115,13 +118,17 @@ static void uv_close_callback(uv_handle_t *handle) {
grpc_exec_ctx_finish(&exec_ctx);
}
+static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user) {
+ return grpc_resource_user_slice_malloc(exec_ctx, resource_user,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+}
+
static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
uv_buf_t *buf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = handle->data;
(void)suggested_size;
- tcp->read_slice = grpc_resource_user_slice_malloc(
- &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice);
buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
@@ -148,6 +155,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// Successful read
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
grpc_slice_buffer_add(tcp->read_slices, sub);
+ tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
error = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
size_t i;
@@ -334,6 +342,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
@@ -350,6 +359,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false;
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -357,6 +367,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
uv_unref((uv_handle_t *)handle);
#endif
+ grpc_exec_ctx_finish(&exec_ctx);
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index bf73d2c685..e6a9eb0e86 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -44,41 +44,63 @@
grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
+ * deadlines earlier than 'queue_deadline" cap are maintained in the heap and
+ * others are maintained in the list (unordered). This helps to keep the number
+ * of elements in the heap low.
+ *
+ * The 'queue_deadline_cap' gets recomputed periodically based on the timer
+ * stats maintained in 'stats' and the relevant timers are then moved from the
+ * 'list' to 'heap'
+ */
typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
gpr_atm queue_deadline_cap;
+ /* The deadline of the next timer due in this shard */
gpr_atm min_deadline;
- /* Index in the g_shard_queue */
+ /* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
list have the top bit of their deadline set to 0. */
grpc_timer_heap heap;
/* This holds timers whose deadline is >= queue_deadline_cap. */
grpc_timer list;
-} shard_type;
+} timer_shard;
+
+/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
+ * is hashed to select the timer shard to add the timer to */
+static timer_shard g_shards[NUM_SHARDS];
+
+/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
+ * the deadline of the next timer in each shard).
+ * Access to this is protected by g_shared_mutables.mu */
+static timer_shard *g_shard_queue[NUM_SHARDS];
+
+/* Thread local variable that stores the deadline of the next timer the thread
+ * has last-seen. This is an optimization to prevent the thread from checking
+ * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
+ * an expensive operation) */
+GPR_TLS_DECL(g_last_seen_min_timer);
struct shared_mutables {
+ /* The deadline of the next timer due across all timer shards */
gpr_atm min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
- /* Protects g_shard_queue */
+ /* Protects g_shard_queue (and the shared_mutables struct itself) */
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
static struct shared_mutables g_shared_mutables = {
.checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
};
+
static gpr_clock_type g_clock_type;
-static shard_type g_shards[NUM_SHARDS];
-/* Protected by g_shared_mutables.mu */
-static shard_type *g_shard_queue[NUM_SHARDS];
static gpr_timespec g_start_time;
-GPR_TLS_DECL(g_last_seen_min_timer);
-
static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
if (a > GPR_ATM_MAX - b) {
return GPR_ATM_MAX;
@@ -122,7 +144,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) {
return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0));
}
-static gpr_atm compute_min_deadline(shard_type *shard) {
+static gpr_atm compute_min_deadline(timer_shard *shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -142,7 +164,7 @@ void grpc_timer_list_init(gpr_timespec now) {
grpc_register_tracer("timer_check", &grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
0.5);
@@ -161,7 +183,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
exec_ctx, GPR_ATM_MAX, NULL,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < NUM_SHARDS; i++) {
- shard_type *shard = &g_shards[i];
+ timer_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
grpc_timer_heap_destroy(&shard->heap);
}
@@ -187,7 +209,7 @@ static void list_remove(grpc_timer *timer) {
}
static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
- shard_type *temp;
+ timer_shard *temp;
temp = g_shard_queue[first_shard_queue_index];
g_shard_queue[first_shard_queue_index] =
g_shard_queue[first_shard_queue_index + 1];
@@ -198,7 +220,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) {
first_shard_queue_index + 1;
}
-static void note_deadline_change(shard_type *shard) {
+static void note_deadline_change(timer_shard *shard) {
while (shard->shard_queue_index > 0 &&
shard->min_deadline <
g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
@@ -215,7 +237,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
@@ -303,7 +325,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
return;
}
- shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+ timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACER_ON(grpc_timer_trace)) {
gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
@@ -321,12 +343,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
gpr_mu_unlock(&shard->mu);
}
-/* This is called when the queue is empty and "now" has reached the
- queue_deadline_cap. We compute a new queue deadline and then scan the map
- for timers that fall at or under it. Returns true if the queue is no
- longer empty.
+/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
+ all relevant timers in shard->list (i.e timers with deadlines earlier than
+ 'queue_deadline_cap') into into shard->heap.
+ Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_queue(shard_type *shard, gpr_atm now) {
+static int refill_heap(timer_shard *shard, gpr_atm now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -363,7 +385,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
+static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) {
grpc_timer *timer;
for (;;) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -373,7 +395,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return NULL;
- if (!refill_queue(shard, now)) return NULL;
+ if (!refill_heap(shard, now)) return NULL;
}
timer = grpc_timer_heap_top(&shard->heap);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
@@ -393,7 +415,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
+static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard,
gpr_atm now, gpr_atm *new_min_deadline,
grpc_error *error) {
size_t n = 0;
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 520d4a3252..cb7998db97 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -50,6 +50,9 @@ static completed_thread *g_completed_threads;
static bool g_kicked;
// is there a thread waiting until the next timer should fire?
static bool g_has_timed_waiter;
+// the deadline of the current timed waiter thread (only relevant if
+// g_has_timed_waiter is true)
+static gpr_timespec g_timed_waiter_deadline;
// generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation;
@@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing
- // waiter
- // so that the next deadline is not missed
+ // waiter so that the next deadline is not missed
if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
@@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) {
gpr_mu_unlock(&g_mu);
return false;
}
- // if there's no timed waiter, we should become one: that waiter waits
- // only until the next timer should expire
- // all other timers wait forever
- uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
- if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
- g_has_timed_waiter = true;
- // we use a generation counter to track the timed waiter so we can
- // cancel an existing one quickly (and when it actually times out it'll
- // figure stuff out instead of incurring a wakeup)
- my_timed_waiter_generation = ++g_timed_waiter_generation;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
- wait_time.tv_sec, wait_time.tv_nsec);
+
+ // If g_kicked is true at this point, it means there was a kick from the timer
+ // system that the timer-manager threads here missed. We cannot trust 'next'
+ // here any longer (since there might be an earlier deadline). So if g_kicked
+ // is true at this point, we should quickly exit this and get the next
+ // deadline from the timer system
+
+ if (!g_kicked) {
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire. All other timers wait forever
+ //
+ // 'g_timed_waiter_generation' is a global generation counter. The idea here
+ // is that the thread becoming a timed-waiter increments and stores this
+ // global counter locally in 'my_timed_waiter_generation' before going to
+ // sleep. After waking up, if my_timed_waiter_generation ==
+ // g_timed_waiter_generation, it can be sure that it was the timed_waiter
+ // thread (and that no other thread took over while this was asleep)
+ //
+ // Initialize my_timed_waiter_generation to some value that is NOT equal to
+ // g_timed_waiter_generation
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+
+ /* If there's no timed waiter, we should become one: that waiter waits only
+ until the next timer should expire. All other timer threads wait forever
+ unless their 'next' is earlier than the current timed-waiter's deadline
+ (in which case the thread with earlier 'next' takes over as the new timed
+ waiter) */
+ if (gpr_time_cmp(next, inf_future) != 0) {
+ if (!g_has_timed_waiter ||
+ (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) {
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ g_has_timed_waiter = true;
+ g_timed_waiter_deadline = next;
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_timespec wait_time =
+ gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
+ wait_time.tv_sec, wait_time.tv_nsec);
+ }
+ } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
+ next = inf_future;
+ }
}
- } else {
- next = inf_future;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace) &&
+ gpr_time_cmp(next, inf_future) == 0) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
+
+ gpr_cv_wait(&g_cv_wait, &g_mu, next);
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation,
+ g_kicked);
+ }
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ }
}
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
- my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
- }
- // if this was the timed waiter, then we need to check timers, and flag
- // that there's now no timed waiter... we'll look for a replacement if
- // there's work to do after checking timers (code above)
- if (my_timed_waiter_generation == g_timed_waiter_generation) {
- g_has_timed_waiter = false;
- }
+
// if this was a kick from the timer system, consume it (and don't stop
// this thread yet)
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
}
+
gpr_mu_unlock(&g_mu);
return true;
}
@@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) {
g_waiter_count = 0;
g_completed_threads = NULL;
+ g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+
start_threads();
}
@@ -302,6 +342,7 @@ void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu);
g_kicked = true;
g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu);
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index 8c747085bb..6cd558d123 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -462,6 +462,35 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) {
return result;
}
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+
+// Provide compatibility across OpenSSL 1.02 and 1.1.
+static int RSA_set0_key(RSA *r, BIGNUM *n, BIGNUM *e, BIGNUM *d) {
+ /* If the fields n and e in r are NULL, the corresponding input
+ * parameters MUST be non-NULL for n and e. d may be
+ * left NULL (in case only the public key is used).
+ */
+ if ((r->n == NULL && n == NULL) || (r->e == NULL && e == NULL)) {
+ return 0;
+ }
+
+ if (n != NULL) {
+ BN_free(r->n);
+ r->n = n;
+ }
+ if (e != NULL) {
+ BN_free(r->e);
+ r->e = e;
+ }
+ if (d != NULL) {
+ BN_free(r->d);
+ r->d = d;
+ }
+
+ return 1;
+}
+#endif // OPENSSL_VERSION_NUMBER < 0x10100000L
+
static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json,
const char *kty) {
const grpc_json *key_prop;
@@ -478,21 +507,27 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json,
gpr_log(GPR_ERROR, "Could not create rsa key.");
goto end;
}
+ BIGNUM *tmp_n = NULL;
+ BIGNUM *tmp_e = NULL;
for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) {
if (strcmp(key_prop->key, "n") == 0) {
- rsa->n =
+ tmp_n =
bignum_from_base64(exec_ctx, validate_string_field(key_prop, "n"));
- if (rsa->n == NULL) goto end;
+ if (tmp_n == NULL) goto end;
} else if (strcmp(key_prop->key, "e") == 0) {
- rsa->e =
+ tmp_e =
bignum_from_base64(exec_ctx, validate_string_field(key_prop, "e"));
- if (rsa->e == NULL) goto end;
+ if (tmp_e == NULL) goto end;
}
}
- if (rsa->e == NULL || rsa->n == NULL) {
+ if (tmp_e == NULL || tmp_n == NULL) {
gpr_log(GPR_ERROR, "Missing RSA public key field.");
goto end;
}
+ if (!RSA_set0_key(rsa, tmp_n, tmp_e, NULL)) {
+ gpr_log(GPR_ERROR, "Cannot set RSA key from inputs.");
+ goto end;
+ }
result = EVP_PKEY_new();
EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 58112b04b4..50a51b31cd 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -49,7 +49,6 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
- grpc_transport_stream_op_batch op;
gpr_atm security_context_set;
gpr_mu security_context_mu;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
@@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
size_t num_md,
grpc_credentials_status status,
const char *error_details) {
- grpc_call_element *elem = (grpc_call_element *)user_data;
+ grpc_transport_stream_op_batch *batch =
+ (grpc_transport_stream_op_batch *)user_data;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_transport_stream_op_batch *op = &calld->op;
- grpc_metadata_batch *mdb;
- size_t i;
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_NONE;
if (status != GRPC_CREDENTIALS_OK) {
@@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED);
} else {
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
- GPR_ASSERT(op->send_initial_metadata);
- mdb = op->payload->send_initial_metadata.send_initial_metadata;
- for (i = 0; i < num_md; i++) {
+ GPR_ASSERT(batch->send_initial_metadata);
+ grpc_metadata_batch *mdb =
+ batch->payload->send_initial_metadata.send_initial_metadata;
+ for (size_t i = 0; i < num_md; i++) {
add_error(&error,
grpc_metadata_batch_add_tail(
exec_ctx, mdb, &calld->md_links[i],
@@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
if (error == GRPC_ERROR_NONE) {
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
} else {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc,
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
- (grpc_client_security_context *)op->payload
+ (grpc_client_security_context *)batch->payload
->context[GRPC_CONTEXT_SECURITY]
.value;
grpc_call_credentials *channel_call_creds =
@@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
if (channel_call_creds == NULL && !call_creds_has_md) {
/* Skip sending metadata altogether. */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
return;
}
@@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
ctx->creds, NULL);
if (calld->creds == NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
+ exec_ctx, batch,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
@@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
- calld->op = *op; /* Copy op (originates from the caller's stack). */
GPR_ASSERT(calld->pollent != NULL);
grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- on_credentials_metadata, elem);
+ on_credentials_metadata, batch);
}
static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_security_status status) {
- grpc_call_element *elem = (grpc_call_element *)user_data;
+ grpc_transport_stream_op_batch *batch =
+ (grpc_transport_stream_op_batch *)user_data;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
if (status == GRPC_SECURITY_OK) {
- send_security_metadata(exec_ctx, elem, &calld->op);
+ send_security_metadata(exec_ctx, elem, batch);
} else {
char *error_msg;
char *host = grpc_slice_to_c_string(calld->host);
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
host);
gpr_free(host);
- grpc_call_element_signal_error(
- exec_ctx, elem,
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED));
@@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("auth_start_transport_op", 0);
+static void auth_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
+ GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0);
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_linked_mdelem *l;
- grpc_client_security_context *sec_ctx = NULL;
- if (!op->cancel_stream) {
+ if (!batch->cancel_stream) {
/* double checked lock over security context to ensure it's set once */
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
gpr_mu_lock(&calld->security_context_mu);
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- GPR_ASSERT(op->payload->context != NULL);
- if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- op->payload->context[GRPC_CONTEXT_SECURITY].value =
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create();
- op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
- sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
sec_ctx->auth_context =
GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
@@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
- if (op->send_initial_metadata) {
- for (l = op->payload->send_initial_metadata.send_initial_metadata->list
- .head;
+ if (batch->send_initial_metadata) {
+ for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata
+ .send_initial_metadata->list.head;
l != NULL; l = l->next) {
grpc_mdelem md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
@@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (calld->have_host) {
char *call_host = grpc_slice_to_c_string(calld->host);
- calld->op = *op; /* Copy op (originates from the caller's stack). */
+ batch->handler_private.extra_arg = elem;
grpc_channel_security_connector_check_call_host(
exec_ctx, chand->security_connector, call_host, chand->auth_context,
- on_host_checked, elem);
+ on_host_checked, batch);
gpr_free(call_host);
- GPR_TIMER_END("auth_start_transport_op", 0);
+ GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
}
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("auth_start_transport_op", 0);
+ grpc_call_next_op(exec_ctx, elem, batch);
+ GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_client_auth_filter = {
- auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, set_pollset_or_pollset_set, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"};
+ auth_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "client-auth"};
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 3c0c24254b..29327107e5 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -383,8 +383,7 @@ static void fake_channel_add_handshakers(
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_adapter_handshaker(
- tsi_create_fake_handshaker(true /* is_client */)),
+ exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
&sc->base));
}
@@ -394,8 +393,7 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_handshake_manager_add(
handshake_mgr,
grpc_security_handshaker_create(
- exec_ctx, tsi_create_adapter_handshaker(
- tsi_create_fake_handshaker(false /* is_client */)),
+ exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
&sc->base));
}
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 4e6914be7b..9bf3f0ca0f 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -27,14 +27,9 @@
#include "src/core/lib/slice/slice_internal.h"
typedef struct call_data {
- grpc_metadata_batch *recv_initial_metadata;
- /* Closure to call when finished with the auth_on_recv hook. */
- grpc_closure *on_done_recv;
- /* Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member after
- handling it. */
- grpc_closure auth_on_recv;
- grpc_transport_stream_op_batch *transport_op;
+ grpc_transport_stream_op_batch *recv_initial_metadata_batch;
+ grpc_closure *original_recv_initial_metadata_ready;
+ grpc_closure recv_initial_metadata_ready;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@@ -90,125 +85,96 @@ static void on_md_processing_done(
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
+ grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
-
+ grpc_error *error = GRPC_ERROR_NONE;
if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
- /* TODO(ctiller): propagate error */
- GRPC_LOG_IF_ERROR(
- "grpc_metadata_batch_filter",
- grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata,
- remove_consumed_md, elem,
- "Response metadata filtering error"));
- for (size_t i = 0; i < calld->md.count; i++) {
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
- }
- grpc_metadata_array_destroy(&calld->md);
- GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
+ error = grpc_metadata_batch_filter(
+ &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ remove_consumed_md, elem, "Response metadata filtering error");
} else {
- for (size_t i = 0; i < calld->md.count; i++) {
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
- }
- grpc_metadata_array_destroy(&calld->md);
- error_details = error_details != NULL
- ? error_details
- : "Authentication metadata processing failed.";
- if (calld->transport_op->send_message) {
- grpc_byte_stream_destroy(
- &exec_ctx, calld->transport_op->payload->send_message.send_message);
- calld->transport_op->payload->send_message.send_message = NULL;
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
}
- GRPC_CLOSURE_SCHED(
- &exec_ctx, calld->on_done_recv,
+ error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status));
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
-
+ for (size_t i = 0; i < calld->md.count; i++) {
+ grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
+ grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
+ }
+ grpc_metadata_array_destroy(&calld->md);
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
+static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
- calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata);
+ calld->md = metadata_batch_to_md_array(
+ batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
chand->creds->processor.state, calld->auth_context,
calld->md.metadata, calld->md.count, on_md_processing_done, elem);
return;
}
}
- GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
}
-static void set_recv_ops_md_callbacks(grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static void auth_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
-
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->on_done_recv =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->auth_on_recv;
- calld->transport_op = op;
+ if (batch->recv_initial_metadata) {
+ // Inject our callback.
+ calld->recv_initial_metadata_batch = batch;
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
}
-}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- set_recv_ops_md_callbacks(elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_server_security_context *server_ctx = NULL;
-
- /* initialize members */
- memset(calld, 0, sizeof(*calld));
- GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
-
+ // Create server security context. Set its auth context from channel
+ // data and save it in the call context.
+ grpc_server_security_context *server_ctx =
+ grpc_server_security_context_create();
+ server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
+ calld->auth_context = server_ctx->auth_context;
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
args->context[GRPC_CONTEXT_SECURITY].destroy(
args->context[GRPC_CONTEXT_SECURITY].value);
}
-
- server_ctx = grpc_server_security_context_create();
- server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
- calld->auth_context = server_ctx->auth_context;
-
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
-
return GRPC_ERROR_NONE;
}
@@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
+ GPR_ASSERT(!args->is_last);
+ channel_data *chand = elem->channel_data;
grpc_auth_context *auth_context =
grpc_find_auth_context_in_args(args->channel_args);
- grpc_server_credentials *creds =
- grpc_find_server_credentials_in_args(args->channel_args);
- /* grab pointers to our data from the channel element */
- channel_data *chand = elem->channel_data;
-
- GPR_ASSERT(!args->is_last);
GPR_ASSERT(auth_context != NULL);
-
- /* initialize members */
chand->auth_context =
GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter");
+ grpc_server_credentials *creds =
+ grpc_find_server_credentials_in_args(args->channel_args);
chand->creds = grpc_server_credentials_ref(creds);
return GRPC_ERROR_NONE;
}
@@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter");
grpc_server_credentials_unref(exec_ctx, chand->creds);
}
const grpc_channel_filter grpc_server_auth_filter = {
- auth_start_transport_op,
+ auth_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c
index b433c61b4c..9e0f73ae3d 100644
--- a/src/core/lib/support/arena.c
+++ b/src/core/lib/support/arena.c
@@ -38,7 +38,7 @@ struct gpr_arena {
gpr_arena *gpr_arena_create(size_t initial_size) {
initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size);
- gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size);
+ gpr_arena *a = (gpr_arena *)gpr_zalloc(sizeof(gpr_arena) + initial_size);
a->initial_zone.size_end = initial_size;
return a;
}
@@ -64,7 +64,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) {
zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm);
if (next_z == NULL) {
size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far);
- next_z = gpr_zalloc(sizeof(zone) + next_z_size);
+ next_z = (zone *)gpr_zalloc(sizeof(zone) + next_z_size);
next_z->size_begin = z->size_end;
next_z->size_end = z->size_end + next_z_size;
if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) {
diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c
index caa0bafe33..2f37d62f76 100644
--- a/src/core/lib/support/atm.c
+++ b/src/core/lib/support/atm.c
@@ -21,12 +21,12 @@
gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta,
gpr_atm min, gpr_atm max) {
- gpr_atm current;
- gpr_atm new;
+ gpr_atm current_value;
+ gpr_atm new_value;
do {
- current = gpr_atm_no_barrier_load(value);
- new = GPR_CLAMP(current + delta, min, max);
- if (new == current) break;
- } while (!gpr_atm_no_barrier_cas(value, current, new));
- return new;
+ current_value = gpr_atm_no_barrier_load(value);
+ new_value = GPR_CLAMP(current_value + delta, min, max);
+ if (new_value == current_value) break;
+ } while (!gpr_atm_no_barrier_cas(value, current_value, new_value));
+ return new_value;
}
diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c
index aa0f665272..a6178fdbce 100644
--- a/src/core/lib/support/avl.c
+++ b/src/core/lib/support/avl.c
@@ -76,7 +76,7 @@ static gpr_avl_node *assert_invariants(gpr_avl_node *n) { return n; }
gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left,
gpr_avl_node *right) {
- gpr_avl_node *node = gpr_malloc(sizeof(*node));
+ gpr_avl_node *node = (gpr_avl_node *)gpr_malloc(sizeof(*node));
gpr_ref_init(&node->refs, 1);
node->key = key;
node->value = value;
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index 5c512661a3..61d2346427 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -64,6 +64,8 @@ void gpr_default_log(gpr_log_func_args *args) {
time_t timer;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
+ static __thread long tid = 0;
+ if (tid == 0) tid = gettid();
timer = (time_t)now.tv_sec;
final_slash = strrchr(args->file, '/');
@@ -81,7 +83,7 @@ void gpr_default_log(gpr_log_func_args *args) {
gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]",
gpr_log_severity_string(args->severity), time_buffer,
- now.tv_nsec, gettid(), display_file, args->line);
+ now.tv_nsec, tid, display_file, args->line);
fprintf(stderr, "%-60s %s\n", prefix, args->message);
gpr_free(prefix);
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 58c4c435d3..e9f893988d 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
GPR_ASSERT(q->tail == &q->stub);
}
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
- return prev == &q->stub;
}
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
@@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) {
*empty = false;
return NULL;
}
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
- gpr_mpscq_init(&q->queue);
- q->read_lock = GPR_SPINLOCK_INITIALIZER;
-}
-
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
- gpr_mpscq_destroy(&q->queue);
-}
-
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
- return gpr_mpscq_push(&q->queue, n);
-}
-
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
- if (gpr_spinlock_trylock(&q->read_lock)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
- gpr_spinlock_unlock(&q->read_lock);
- return n;
- }
- return NULL;
-}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 2f4739d7f8..daa51768f7 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -22,7 +22,6 @@
#include <grpc/support/atm.h>
#include <stdbool.h>
#include <stddef.h>
-#include "src/core/lib/support/spinlock.h"
// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
@@ -44,34 +43,11 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq *q);
void gpr_mpscq_destroy(gpr_mpscq *q);
// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
-// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty);
-// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
-// only one thread will succeed concurrently
-typedef struct gpr_locked_mpscq {
- gpr_mpscq queue;
- gpr_spinlock read_lock;
-} gpr_locked_mpscq;
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
-// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
-// Pop a node (returns NULL if no node is ready - which doesn't indicate that
-// the queue is empty!!)
-// Thread safe - can be called from multiple threads concurrently
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);
-
#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c
new file mode 100644
index 0000000000..0fb64ed001
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.c
@@ -0,0 +1,137 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/support/stack_lockfree.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+/* The lockfree node structure is a single architecture-level
+ word that allows for an atomic CAS to set it up. */
+struct lockfree_node_contents {
+ /* next thing to look at. Actual index for head, next index otherwise */
+ uint16_t index;
+#ifdef GPR_ARCH_64
+ uint16_t pad;
+ uint32_t aba_ctr;
+#else
+#ifdef GPR_ARCH_32
+ uint16_t aba_ctr;
+#else
+#error Unsupported bit width architecture
+#endif
+#endif
+};
+
+/* Use a union to make sure that these are in the same bits as an atm word */
+typedef union lockfree_node {
+ gpr_atm atm;
+ struct lockfree_node_contents contents;
+} lockfree_node;
+
+/* make sure that entries aligned to 8-bytes */
+#define ENTRY_ALIGNMENT_BITS 3
+/* reserve this entry as invalid */
+#define INVALID_ENTRY_INDEX ((1 << 16) - 1)
+
+struct gpr_stack_lockfree {
+ lockfree_node *entries;
+ lockfree_node head; /* An atomic entry describing curr head */
+};
+
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
+ gpr_stack_lockfree *stack;
+ stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack));
+ /* Since we only allocate 16 bits to represent an entry number,
+ * make sure that we are within the desired range */
+ /* Reserve the highest entry number as a dummy */
+ GPR_ASSERT(entries < INVALID_ENTRY_INDEX);
+ stack->entries = (lockfree_node *)gpr_malloc_aligned(
+ entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS);
+ /* Clear out all entries */
+ memset(stack->entries, 0, entries * sizeof(stack->entries[0]));
+ memset(&stack->head, 0, sizeof(stack->head));
+
+ GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents));
+
+ /* Point the head at reserved dummy entry */
+ stack->head.contents.index = INVALID_ENTRY_INDEX;
+/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */
+#ifdef GPR_ARCH_64
+ stack->head.contents.pad = 0;
+#endif
+ stack->head.contents.aba_ctr = 0;
+ return stack;
+}
+
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) {
+ gpr_free_aligned(stack->entries);
+ gpr_free(stack);
+}
+
+int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
+ lockfree_node head;
+ lockfree_node newhead;
+ lockfree_node curent;
+ lockfree_node newent;
+
+ /* First fill in the entry's index and aba ctr for new head */
+ newhead.contents.index = (uint16_t)entry;
+#ifdef GPR_ARCH_64
+ /* Fill in the pad to avoid confusing memcheck tools */
+ newhead.contents.pad = 0;
+#endif
+
+ /* Also post-increment the aba_ctr */
+ curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newhead.contents.aba_ctr = ++curent.contents.aba_ctr;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm);
+
+ do {
+ /* Atomically get the existing head value for use */
+ head.atm = gpr_atm_no_barrier_load(&(stack->head.atm));
+ /* Point to it */
+ newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newent.contents.index = head.contents.index;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm);
+ } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm));
+ /* Use rel_cas above to make sure that entry index is set properly */
+ return head.contents.index == INVALID_ENTRY_INDEX;
+}
+
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) {
+ lockfree_node head;
+ lockfree_node newhead;
+
+ do {
+ head.atm = gpr_atm_acq_load(&(stack->head.atm));
+ if (head.contents.index == INVALID_ENTRY_INDEX) {
+ return -1;
+ }
+ newhead.atm =
+ gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm));
+
+ } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm));
+
+ return head.contents.index;
+}
diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h
new file mode 100644
index 0000000000..6324211b72
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+
+#include <stddef.h>
+
+typedef struct gpr_stack_lockfree gpr_stack_lockfree;
+
+/* This stack must specify the maximum number of entries to track.
+ The current implementation only allows up to 65534 entries */
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries);
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack);
+
+/* Pass in a valid entry number for the next stack entry */
+/* Returns 1 if this is the first element on the stack, 0 otherwise */
+int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry);
+
+/* Returns -1 on empty or the actual entry number */
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack);
+
+#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c769866ceb..aac443ed06 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1504,7 +1504,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* TODO(ctiller): just make these the same variable? */
- call->metadata_batch[0][0].deadline = call->send_deadline;
+ if (call->is_client) {
+ call->metadata_batch[0][0].deadline = call->send_deadline;
+ }
stream_op_payload->send_initial_metadata.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b04aee6c73..14e55eda85 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -189,16 +189,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
- size_t (*size)();
- void (*begin_op)(grpc_completion_queue *cc, void *tag);
- void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag,
+ size_t data_size;
+ void (*init)(void *data);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
+ void (*destroy)(void *data);
+ void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
- grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline,
+ grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
- grpc_event (*pluck)(grpc_completion_queue *cc, void *tag,
+ grpc_event (*pluck)(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
} cq_vtable;
@@ -218,25 +221,28 @@ typedef struct grpc_cq_event_queue {
gpr_atm num_queue_items;
} grpc_cq_event_queue;
-/* TODO: sreek Refactor this based on the completion_type. Put completion-type
- * specific data in a different structure (and co-allocate memory for it along
- * with completion queue + pollset )*/
-typedef struct cq_data {
- gpr_mu *mu;
+typedef struct cq_next_data {
+ /** Completed events for completion-queues of type GRPC_CQ_NEXT */
+ grpc_cq_event_queue queue;
+ /** Counter of how many things have ever been queued on this completion queue
+ useful for avoiding locks to check the queue */
+ gpr_atm things_queued_ever;
+
+ /* Number of outstanding events (+1 if not shut down) */
+ gpr_atm pending_events;
+
+ int shutdown_called;
+} cq_next_data;
+
+typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
- /** Completed events for completion-queues of type GRPC_CQ_NEXT */
- grpc_cq_event_queue queue;
-
/** Number of pending events (+1 if we're not shutdown) */
gpr_refcount pending_events;
- /** Once owning_refs drops to zero, we will destroy the cq */
- gpr_refcount owning_refs;
-
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
@@ -245,37 +251,45 @@ typedef struct cq_data {
gpr_atm shutdown;
int shutdown_called;
- int is_server_cq;
-
int num_pluckers;
- int num_polls;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
- grpc_closure pollset_shutdown_done;
+} cq_pluck_data;
+
+/* Completion queue structure */
+struct grpc_completion_queue {
+ /** Once owning_refs drops to zero, we will destroy the cq */
+ gpr_refcount owning_refs;
+
+ gpr_mu *mu;
+
+ const cq_vtable *vtable;
+ const cq_poller_vtable *poller_vtable;
#ifndef NDEBUG
void **outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
-} cq_data;
-/* Completion queue structure */
-struct grpc_completion_queue {
- cq_data data;
- const cq_vtable *vtable;
- const cq_poller_vtable *poller_vtable;
+ grpc_closure pollset_shutdown_done;
+ int num_polls;
};
/* Forward declarations */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc);
-
-static size_t cq_size(grpc_completion_queue *cc);
-
-static void cq_begin_op(grpc_completion_queue *cc, void *tag);
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq);
+
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -283,39 +297,51 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
void *done_arg, grpc_cq_completion *storage);
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved);
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved);
+static void cq_init_next(void *data);
+static void cq_init_pluck(void *data);
+static void cq_destroy_next(void *data);
+static void cq_destroy_pluck(void *data);
+
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */
- {.cq_completion_type = GRPC_CQ_NEXT,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_next_data),
+ .cq_completion_type = GRPC_CQ_NEXT,
+ .init = cq_init_next,
+ .shutdown = cq_shutdown_next,
+ .destroy = cq_destroy_next,
+ .begin_op = cq_begin_op_for_next,
.end_op = cq_end_op_for_next,
.next = cq_next,
.pluck = NULL},
/* GRPC_CQ_PLUCK */
- {.cq_completion_type = GRPC_CQ_PLUCK,
- .size = cq_size,
- .begin_op = cq_begin_op,
+ {.data_size = sizeof(cq_pluck_data),
+ .cq_completion_type = GRPC_CQ_PLUCK,
+ .init = cq_init_pluck,
+ .shutdown = cq_shutdown_pluck,
+ .destroy = cq_destroy_pluck,
+ .begin_op = cq_begin_op_for_pluck,
.end_op = cq_end_op_for_pluck,
.next = NULL,
.pluck = cq_pluck},
};
-#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
-#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1)
+#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
+#define POLLSET_FROM_CQ(cq) \
+ ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq)))
grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true);
grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
@@ -329,7 +355,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
+static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
grpc_error *error);
static void cq_event_queue_init(grpc_cq_event_queue *q) {
@@ -342,9 +368,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
gpr_mpscq_destroy(&q->queue);
}
-static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
+static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
- gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1);
+ return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
@@ -367,16 +393,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
}
-static size_t cq_size(grpc_completion_queue *cc) {
- /* Size of the completion queue and the size of the pollset whose memory is
- allocated right after that of completion queue */
- return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
-}
-
grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
- grpc_completion_queue *cc;
+ grpc_completion_queue *cq;
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
@@ -389,158 +409,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
- cq_data *cqd = &cc->data;
+ cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
+ poller_vtable->size());
- cc->vtable = vtable;
- cc->poller_vtable = poller_vtable;
+ cq->vtable = vtable;
+ cq->poller_vtable = poller_vtable;
- poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu);
+ /* One for destroy(), one for pollset_shutdown */
+ gpr_ref_init(&cq->owning_refs, 2);
-#ifndef NDEBUG
- cqd->outstanding_tags = NULL;
- cqd->outstanding_tag_capacity = 0;
-#endif
+ poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
+ vtable->init(DATA_FROM_CQ(cq));
+
+ GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
+ grpc_schedule_on_exec_ctx);
+
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
+
+ return cq;
+}
+static void cq_init_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
+ cqd->shutdown_called = false;
+ gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
+ cq_event_queue_init(&cqd->queue);
+}
+
+static void cq_destroy_next(void *ptr) {
+ cq_next_data *cqd = ptr;
+ GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
+ cq_event_queue_destroy(&cqd->queue);
+}
+
+static void cq_init_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cqd->pending_events, 1);
- /* One for destroy(), one for pollset_shutdown */
- gpr_ref_init(&cqd->owning_refs, 2);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
cqd->shutdown_called = 0;
- cqd->is_server_cq = 0;
cqd->num_pluckers = 0;
- cqd->num_polls = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
-#ifndef NDEBUG
- cqd->outstanding_tag_count = 0;
-#endif
- cq_event_queue_init(&cqd->queue);
- GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc,
- grpc_schedule_on_exec_ctx);
-
- GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
+}
- return cc;
+static void cq_destroy_pluck(void *ptr) {
+ cq_pluck_data *cqd = ptr;
+ GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
- return cc->vtable->cq_completion_type;
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) {
+ return cq->vtable->cq_completion_type;
}
-int grpc_get_cq_poll_num(grpc_completion_queue *cc) {
+int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
int cur_num_polls;
- gpr_mu_lock(cc->data.mu);
- cur_num_polls = cc->data.num_polls;
- gpr_mu_unlock(cc->data.mu);
+ gpr_mu_lock(cq->mu);
+ cur_num_polls = cq->num_polls;
+ gpr_mu_unlock(cq->mu);
return cur_num_polls;
}
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1,
+ "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
reason);
}
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+void grpc_cq_internal_ref(grpc_completion_queue *cq) {
#endif
- gpr_ref(&cqd->owning_refs);
+ gpr_ref(&cq->owning_refs);
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_completion_queue *cc = arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy");
+ grpc_completion_queue *cq = arg;
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
const char *reason, const char *file, int line) {
- cq_data *cqd = &cc->data;
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
- gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count);
+ gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1,
+ "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
reason);
}
#else
void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+ grpc_completion_queue *cq) {
#endif
- if (gpr_unref(&cqd->owning_refs)) {
- GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
- cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc));
- cq_event_queue_destroy(&cqd->queue);
+ if (gpr_unref(&cq->owning_refs)) {
+ cq->vtable->destroy(DATA_FROM_CQ(cq));
+ cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
- gpr_free(cqd->outstanding_tags);
+ gpr_free(cq->outstanding_tags);
#endif
- gpr_free(cc);
+ gpr_free(cq);
}
}
-static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cq_data *cqd = &cc->data;
-#ifndef NDEBUG
- gpr_mu_lock(cqd->mu);
+static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
+}
+
+static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
- if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) {
- cqd->outstanding_tag_capacity =
- GPR_MAX(4, 2 * cqd->outstanding_tag_capacity);
- cqd->outstanding_tags =
- gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) *
- cqd->outstanding_tag_capacity);
- }
- cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cqd->mu);
-#endif
gpr_ref(&cqd->pending_events);
}
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
- cc->vtable->begin_op(cc, tag);
+void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ cq->vtable->begin_op(cq, tag);
}
#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
- cq_data *cqd = &cc->data;
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
}
- for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) {
- if (cqd->outstanding_tags[i] == tag) {
- cqd->outstanding_tag_count--;
- GPR_SWAP(void *, cqd->outstanding_tags[i],
- cqd->outstanding_tags[cqd->outstanding_tag_count]);
+ for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
+ if (cq->outstanding_tags[i] == tag) {
+ cq->outstanding_tag_count--;
+ GPR_SWAP(void *, cq->outstanding_tags[i],
+ cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
}
}
if (lock_cq) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
GPR_ASSERT(found);
}
#else
-static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_NEXT) */
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
@@ -553,16 +588,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -570,28 +605,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
storage->done_arg = done_arg;
storage->next = (uintptr_t)(is_success);
- cq_check_tag(cc, tag, true); /* Used in debug builds only */
+ cq_check_tag(cq, tag, true); /* Used in debug builds only */
/* Add the completion to the queue */
- cq_event_queue_push(&cqd->queue, storage);
+ bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
-
- gpr_mu_lock(cqd->mu);
-
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
- grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL);
- gpr_mu_unlock(cqd->mu);
-
- if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
-
- GRPC_ERROR_UNREF(kick_error);
+ bool will_definitely_shutdown =
+ gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
+
+ if (!will_definitely_shutdown) {
+ /* Only kick if this is the first item queued */
+ if (is_first) {
+ gpr_mu_lock(cq->mu);
+ grpc_error *kick_error =
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
+
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char *msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
+ }
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -599,16 +648,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
+/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
+ * completion
* type of GRPC_CQ_PLUCK) */
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc, void *tag,
+ grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -618,9 +668,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -632,8 +682,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
storage->done_arg = done_arg;
storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
- gpr_mu_lock(cqd->mu);
- cq_check_tag(cc, tag, false); /* Used in debug builds only */
+ gpr_mu_lock(cq->mu);
+ cq_check_tag(cq, tag, false); /* Used in debug builds only */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
@@ -652,9 +702,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error *kick_error =
- cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
@@ -663,8 +713,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
} else {
- cq_finish_shutdown(exec_ctx, cc);
- gpr_mu_unlock(cqd->mu);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -672,12 +722,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
+void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -692,7 +742,7 @@ typedef struct {
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -703,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but that
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
* is ok and doesn't affect correctness. Might effect the tail latencies a
* bit) */
a->stolen_completion = cq_event_queue_pop(&cqd->queue);
@@ -716,58 +767,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
#ifndef NDEBUG
-static void dump_pending_tags(grpc_completion_queue *cc) {
+static void dump_pending_tags(grpc_completion_queue *cq) {
if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
- cq_data *cqd = &cc->data;
-
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
- gpr_mu_lock(cqd->mu);
- for (size_t i = 0; i < cqd->outstanding_tag_count; i++) {
+ gpr_mu_lock(cq->mu);
+ for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
char *s;
- gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]);
+ gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
-static void dump_pending_tags(grpc_completion_queue *cc) {}
+static void dump_pending_tags(grpc_completion_queue *cq) {}
#endif
-static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
+static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
"grpc_completion_queue_next("
- "cc=%p, "
+ "cq=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
reserved));
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "next");
+ GRPC_CQ_INTERNAL_REF(cq, "next");
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = NULL,
@@ -800,21 +849,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
/* If c == NULL it means either the queue is empty OR in an transient
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
- attempt at popping. Not doing this can potentially deadlock this thread
+ attempt at popping. Not doing this can potentially deadlock this
+ thread
forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
}
- if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
+ if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
- (cc->shutdown == true) is only possible when there is no pending work
- (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion
+ (cq->shutdown == true) is only possible when there is no pending
+ work
+ (i.e cq->pending_events == 0) and any outstanding
+ grpc_cq_completion
events are already queued on this cq */
continue;
}
@@ -828,16 +880,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
/* The main polling work happens in grpc_pollset_work */
- gpr_mu_lock(cqd->mu);
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ gpr_mu_lock(cq->mu);
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
NULL, now, iteration_deadline);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
@@ -846,30 +898,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+ if (cq_event_queue_num_items(&cqd->queue) > 0 &&
+ gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
+ gpr_mu_lock(cq->mu);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
+ }
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
}
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
+/* Finishes the completion queue shutdown. This means that there are no more
+ completion events / tags expected from the completion queue
+ - Must be called under completion queue lock
+ - Must be called only once in completion queue's lifetime
+ - grpc_completion_queue_shutdown() MUST have been called before calling
+ this function */
+static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(cqd->shutdown_called);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
+
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
+}
+
+static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ if (cqd->shutdown_called) {
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ return;
+ }
+ cqd->shutdown_called = 1;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_next(exec_ctx, cq);
+ }
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+}
+
+grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline, void *reserved) {
- return cc->vtable->next(cc, deadline, reserved);
+ return cq->vtable->next(cq, deadline, reserved);
}
-static int add_plucker(grpc_completion_queue *cc, void *tag,
+static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -879,9 +975,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag,
return 1;
}
-static void del_plucker(grpc_completion_queue *cc, void *tag,
+static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -895,13 +991,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
- cq_data *cqd = &cq->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
grpc_cq_completion *c;
@@ -913,51 +1009,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
}
prev = c;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
}
return !a->first_loop &&
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
-static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
+static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- cq_data *cqd = &cc->data;
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
- "cc=%p, tag=%p, "
+ "cq=%p, tag=%p, "
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
+ 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, reserved));
}
GPR_ASSERT(!reserved);
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(cqd->mu);
+ GRPC_CQ_INTERNAL_REF(cq, "pluck");
+ gpr_mu_lock(cq->mu);
cq_is_finished_arg is_finished_arg = {
.last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever),
- .cq = cc,
+ .cq = cq,
.deadline = deadline,
.stolen_completion = NULL,
.tag = tag,
@@ -966,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
@@ -983,7 +1079,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
if (c == cqd->completed_tail) {
cqd->completed_tail = prev;
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -993,54 +1089,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
prev = c;
}
if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!add_plucker(cc, tag, &worker)) {
+ if (!add_plucker(cq, tag, &worker)) {
gpr_log(GPR_DEBUG,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
- cqd->num_polls++;
- grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
+ cq->num_polls++;
+ grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
&worker, now, deadline);
if (err != GRPC_ERROR_NONE) {
- del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cqd->mu);
+ del_plucker(cq, tag, &worker);
+ gpr_mu_unlock(cq->mu);
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- dump_pending_tags(cc);
+ dump_pending_tags(cq);
break;
}
is_finished_arg.first_loop = false;
- del_plucker(cc, tag, &worker);
+ del_plucker(cq, tag, &worker);
}
done:
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck");
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
@@ -1049,85 +1145,66 @@ done:
return ret;
}
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline, void *reserved) {
- return cc->vtable->pluck(cc, tag, deadline, reserved);
+ return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-/* Finishes the completion queue shutdown. This means that there are no more
- completion events / tags expected from the completion queue
- - Must be called under completion queue lock
- - Must be called only once in completion queue's lifetime
- - grpc_completion_queue_shutdown() MUST have been called before calling
- this function */
-static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cc) {
- cq_data *cqd = &cc->data;
+static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cqd->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
+ &cq->pollset_shutdown_done);
}
-/* Shutdown simply drops a ref that we reserved at creation time; if we drop
- to zero here, then enter shutdown mode and wake up any waiters */
-void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
- GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- cq_data *cqd = &cc->data;
+static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
+ grpc_completion_queue *cq) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- gpr_mu_lock(cqd->mu);
+ gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = 1;
if (gpr_unref(&cqd->pending_events)) {
- cq_finish_shutdown(&exec_ctx, cc);
+ cq_finish_shutdown_pluck(exec_ctx, cq);
}
- gpr_mu_unlock(cqd->mu);
+ gpr_mu_unlock(cq->mu);
+}
+
+/* Shutdown simply drops a ref that we reserved at creation time; if we drop
+ to zero here, then enter shutdown mode and wake up any waiters */
+void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
+ GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
+ cq->vtable->shutdown(&exec_ctx, cq);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
-void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
- GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
+void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
+ GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
- grpc_completion_queue_shutdown(cc);
-
- /* TODO (sreek): This should not ideally be here. Refactor it into the
- * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
- if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
- GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
- }
+ grpc_completion_queue_shutdown(cq);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy");
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
-}
-
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
- return CQ_FROM_POLLSET(ps);
-}
-
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
- cc->data.is_server_cq = 1;
-}
-
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
- return cc->data.is_server_cq;
+grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL;
}
-bool grpc_cq_can_listen(grpc_completion_queue *cc) {
- return cc->poller_vtable->can_listen;
+bool grpc_cq_can_listen(grpc_completion_queue *cq) {
+ return cq->poller_vtable->can_listen;
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 97ea9cae20..af44482513 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
-grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps);
-void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
-bool grpc_cq_is_server_cq(grpc_completion_queue *cc);
bool grpc_cq_can_listen(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 84ddf74ab9..de0a91e2b2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -32,8 +32,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/mpscq.h"
-#include "src/core/lib/support/spinlock.h"
+#include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);
typedef struct requested_call {
- gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
void *tag;
@@ -162,7 +160,7 @@ struct request_matcher {
grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
- gpr_locked_mpscq *requests_per_cq;
+ gpr_stack_lockfree **requests_per_cq;
};
struct registered_method {
@@ -207,6 +205,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
+ /** requested call backing data */
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, grpc_server *server) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests_per_cq =
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
}
}
static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
- gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
+ gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
@@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server,
request_matcher *rm,
grpc_error *error) {
- requested_call *rc;
+ int request_id;
for (size_t i = 0; i < server->cq_count; i++) {
- /* Here we know:
- 1. no requests are being added (since the server is shut down)
- 2. no other threads are pulling (since the shut down process is single
- threaded)
- So, we can ignore the queue lock and just pop, with the guarantee that a
- NULL returned here truly means that the queue is empty */
- while ((rc = (requested_call *)gpr_mpscq_pop(
- &rm->requests_per_cq[i].queue)) != NULL) {
- fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
+ -1) {
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
+ GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
- gpr_free(req);
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE(return );
}
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
}
@@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
- requested_call *rc =
- (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) {
+ int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) {
continue;
} else {
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -951,8 +975,6 @@ static void register_completion_queue(grpc_server *server,
if (server->cqs[i] == cq) return;
}
- grpc_cq_mark_server_cq(cq);
-
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
@@ -992,6 +1014,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ /* TODO(ctiller): expose a channel_arg for this */
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1064,15 +1088,29 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
- request_matcher_init(&server->unregistered_request_matcher, server);
+ request_matcher_init(&server->unregistered_request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
server_ref(server);
@@ -1116,9 +1154,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->channel = channel;
size_t cq_idx;
- grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
- if (s->cqs[cq_idx] == accepting_cq) break;
+ if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
}
if (cq_idx == s->cq_count) {
/* completion queue not found: pick a random one to publish new calls to */
@@ -1326,11 +1363,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
+ int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
+ if (request_id == -1) {
+ /* out of request ids: just fail this one */
+ fail_call(exec_ctx, server, cq_idx, rc,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
+ return GRPC_CALL_OK;
+ }
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
@@ -1339,13 +1386,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
+ gpr_free(rc);
+ if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
- rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) break;
+ request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -1361,7 +1410,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1468,6 +1518,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
+ server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion);
}
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index 404c240589..2388f19f81 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -464,7 +464,8 @@ grpc_mdelem grpc_static_mdelem_for_static_strings(int a, int b) {
if (a == -1 || b == -1) return GRPC_MDNULL;
uint32_t k = (uint32_t)(a * 99 + b);
uint32_t h = elems_phash(k);
- return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k
+ return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k &&
+ elem_idxs[h] != 255
? GRPC_MAKE_MDELEM(&grpc_static_mdelem_table[elem_idxs[h]],
GRPC_MDELEM_STORAGE_STATIC)
: GRPC_MDNULL;
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index 1e919c4cce..1280680663 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -31,6 +31,7 @@
#define TSI_FAKE_FRAME_HEADER_SIZE 4
#define TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE 64
#define TSI_FAKE_DEFAULT_FRAME_SIZE 16384
+#define TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE 256
/* --- Structure definitions. ---*/
@@ -59,8 +60,10 @@ typedef struct {
int is_client;
tsi_fake_handshake_message next_message_to_send;
int needs_incoming_message;
- tsi_fake_frame incoming;
- tsi_fake_frame outgoing;
+ tsi_fake_frame incoming_frame;
+ tsi_fake_frame outgoing_frame;
+ unsigned char *outgoing_bytes_buffer;
+ size_t outgoing_bytes_buffer_size;
tsi_result result;
} tsi_fake_handshaker;
@@ -116,27 +119,23 @@ static void tsi_fake_frame_reset(tsi_fake_frame *frame, int needs_draining) {
if (!needs_draining) frame->size = 0;
}
-/* Returns 1 if successful, 0 otherwise. */
-static int tsi_fake_frame_ensure_size(tsi_fake_frame *frame) {
+/* Checks if the frame's allocated size is at least frame->size, and reallocs
+ * more memory if necessary. */
+static void tsi_fake_frame_ensure_size(tsi_fake_frame *frame) {
if (frame->data == NULL) {
frame->allocated_size = frame->size;
frame->data = gpr_malloc(frame->allocated_size);
- if (frame->data == NULL) return 0;
} else if (frame->size > frame->allocated_size) {
unsigned char *new_data = gpr_realloc(frame->data, frame->size);
- if (new_data == NULL) {
- gpr_free(frame->data);
- frame->data = NULL;
- return 0;
- }
frame->data = new_data;
frame->allocated_size = frame->size;
}
- return 1;
}
-/* This method should not be called if frame->needs_framing is not 0. */
-static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes,
+/* Decodes the serialized fake frame contained in incoming_bytes, and fills
+ * frame with the contents of the decoded frame.
+ * This method should not be called if frame->needs_framing is not 0. */
+static tsi_result tsi_fake_frame_decode(const unsigned char *incoming_bytes,
size_t *incoming_bytes_size,
tsi_fake_frame *frame) {
size_t available_size = *incoming_bytes_size;
@@ -147,7 +146,6 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes,
if (frame->data == NULL) {
frame->allocated_size = TSI_FAKE_FRAME_INITIAL_ALLOCATED_SIZE;
frame->data = gpr_malloc(frame->allocated_size);
- if (frame->data == NULL) return TSI_OUT_OF_RESOURCES;
}
if (frame->offset < TSI_FAKE_FRAME_HEADER_SIZE) {
@@ -165,7 +163,7 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes,
frame->offset += to_read_size;
available_size -= to_read_size;
frame->size = load32_little_endian(frame->data);
- if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES;
+ tsi_fake_frame_ensure_size(frame);
}
to_read_size = frame->size - frame->offset;
@@ -183,10 +181,12 @@ static tsi_result fill_frame_from_bytes(const unsigned char *incoming_bytes,
return TSI_OK;
}
-/* This method should not be called if frame->needs_framing is 0. */
-static tsi_result drain_frame_to_bytes(unsigned char *outgoing_bytes,
- size_t *outgoing_bytes_size,
- tsi_fake_frame *frame) {
+/* Encodes a fake frame into its wire format and places the result in
+ * outgoing_bytes. outgoing_bytes_size indicates the size of the encoded frame.
+ * This method should not be called if frame->needs_framing is 0. */
+static tsi_result tsi_fake_frame_encode(unsigned char *outgoing_bytes,
+ size_t *outgoing_bytes_size,
+ tsi_fake_frame *frame) {
size_t to_write_size = frame->size - frame->offset;
if (!frame->needs_draining) return TSI_INTERNAL_ERROR;
if (*outgoing_bytes_size < to_write_size) {
@@ -200,17 +200,20 @@ static tsi_result drain_frame_to_bytes(unsigned char *outgoing_bytes,
return TSI_OK;
}
-static tsi_result bytes_to_frame(unsigned char *bytes, size_t bytes_size,
- tsi_fake_frame *frame) {
+/* Sets the payload of a fake frame to contain the given data blob, where
+ * data_size indicates the size of data. */
+static tsi_result tsi_fake_frame_set_data(unsigned char *data, size_t data_size,
+ tsi_fake_frame *frame) {
frame->offset = 0;
- frame->size = bytes_size + TSI_FAKE_FRAME_HEADER_SIZE;
- if (!tsi_fake_frame_ensure_size(frame)) return TSI_OUT_OF_RESOURCES;
+ frame->size = data_size + TSI_FAKE_FRAME_HEADER_SIZE;
+ tsi_fake_frame_ensure_size(frame);
store32_little_endian((uint32_t)frame->size, frame->data);
- memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, bytes, bytes_size);
+ memcpy(frame->data + TSI_FAKE_FRAME_HEADER_SIZE, data, data_size);
tsi_fake_frame_reset(frame, 1 /* needs draining */);
return TSI_OK;
}
+/* Destroys the contents of a fake frame. */
static void tsi_fake_frame_destruct(tsi_fake_frame *frame) {
if (frame->data != NULL) gpr_free(frame->data);
}
@@ -235,7 +238,7 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self,
if (frame->needs_draining) {
drained_size = saved_output_size - *num_bytes_written;
result =
- drain_frame_to_bytes(protected_output_frames, &drained_size, frame);
+ tsi_fake_frame_encode(protected_output_frames, &drained_size, frame);
*num_bytes_written += drained_size;
protected_output_frames += drained_size;
if (result != TSI_OK) {
@@ -254,15 +257,15 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self,
size_t written_in_frame_size = 0;
store32_little_endian((uint32_t)impl->max_frame_size, frame_header);
written_in_frame_size = TSI_FAKE_FRAME_HEADER_SIZE;
- result = fill_frame_from_bytes(frame_header, &written_in_frame_size, frame);
+ result = tsi_fake_frame_decode(frame_header, &written_in_frame_size, frame);
if (result != TSI_INCOMPLETE_DATA) {
- gpr_log(GPR_ERROR, "fill_frame_from_bytes returned %s",
+ gpr_log(GPR_ERROR, "tsi_fake_frame_decode returned %s",
tsi_result_to_string(result));
return result;
}
}
result =
- fill_frame_from_bytes(unprotected_bytes, unprotected_bytes_size, frame);
+ tsi_fake_frame_decode(unprotected_bytes, unprotected_bytes_size, frame);
if (result != TSI_OK) {
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -272,7 +275,7 @@ static tsi_result fake_protector_protect(tsi_frame_protector *self,
if (!frame->needs_draining) return TSI_INTERNAL_ERROR;
if (frame->offset != 0) return TSI_INTERNAL_ERROR;
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(protected_output_frames, &drained_size, frame);
+ result = tsi_fake_frame_encode(protected_output_frames, &drained_size, frame);
*num_bytes_written += drained_size;
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -292,8 +295,8 @@ static tsi_result fake_protector_protect_flush(
store32_little_endian((uint32_t)frame->size,
frame->data); /* Overwrite header. */
}
- result = drain_frame_to_bytes(protected_output_frames,
- protected_output_frames_size, frame);
+ result = tsi_fake_frame_encode(protected_output_frames,
+ protected_output_frames_size, frame);
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
*still_pending_size = frame->size - frame->offset;
return result;
@@ -316,7 +319,7 @@ static tsi_result fake_protector_unprotect(
/* Go past the header if needed. */
if (frame->offset == 0) frame->offset = TSI_FAKE_FRAME_HEADER_SIZE;
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame);
+ result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame);
unprotected_bytes += drained_size;
*num_bytes_written += drained_size;
if (result != TSI_OK) {
@@ -330,7 +333,7 @@ static tsi_result fake_protector_unprotect(
/* Now process the protected_bytes. */
if (frame->needs_draining) return TSI_INTERNAL_ERROR;
- result = fill_frame_from_bytes(protected_frames_bytes,
+ result = tsi_fake_frame_decode(protected_frames_bytes,
protected_frames_bytes_size, frame);
if (result != TSI_OK) {
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
@@ -342,7 +345,7 @@ static tsi_result fake_protector_unprotect(
if (frame->offset != 0) return TSI_INTERNAL_ERROR;
frame->offset = TSI_FAKE_FRAME_HEADER_SIZE; /* Go past the header. */
drained_size = saved_output_size - *num_bytes_written;
- result = drain_frame_to_bytes(unprotected_bytes, &drained_size, frame);
+ result = tsi_fake_frame_encode(unprotected_bytes, &drained_size, frame);
*num_bytes_written += drained_size;
if (result == TSI_INCOMPLETE_DATA) result = TSI_OK;
return result;
@@ -360,6 +363,72 @@ static const tsi_frame_protector_vtable frame_protector_vtable = {
fake_protector_unprotect, fake_protector_destroy,
};
+/* --- tsi_handshaker_result methods implementation. ---*/
+
+typedef struct {
+ tsi_handshaker_result base;
+ unsigned char *unused_bytes;
+ size_t unused_bytes_size;
+} fake_handshaker_result;
+
+static tsi_result fake_handshaker_result_extract_peer(
+ const tsi_handshaker_result *self, tsi_peer *peer) {
+ /* Construct a tsi_peer with 1 property: certificate type. */
+ tsi_result result = tsi_construct_peer(1, peer);
+ if (result != TSI_OK) return result;
+ result = tsi_construct_string_peer_property_from_cstring(
+ TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE,
+ &peer->properties[0]);
+ if (result != TSI_OK) tsi_peer_destruct(peer);
+ return result;
+}
+
+static tsi_result fake_handshaker_result_create_frame_protector(
+ const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ tsi_frame_protector **protector) {
+ *protector = tsi_create_fake_frame_protector(max_output_protected_frame_size);
+ return TSI_OK;
+}
+
+static tsi_result fake_handshaker_result_get_unused_bytes(
+ const tsi_handshaker_result *self, unsigned char **bytes,
+ size_t *bytes_size) {
+ fake_handshaker_result *result = (fake_handshaker_result *)self;
+ *bytes_size = result->unused_bytes_size;
+ *bytes = result->unused_bytes;
+ return TSI_OK;
+}
+
+static void fake_handshaker_result_destroy(tsi_handshaker_result *self) {
+ fake_handshaker_result *result = (fake_handshaker_result *)self;
+ gpr_free(result->unused_bytes);
+ gpr_free(self);
+}
+
+static const tsi_handshaker_result_vtable handshaker_result_vtable = {
+ fake_handshaker_result_extract_peer,
+ fake_handshaker_result_create_frame_protector,
+ fake_handshaker_result_get_unused_bytes, fake_handshaker_result_destroy,
+};
+
+static tsi_result fake_handshaker_result_create(
+ const unsigned char *unused_bytes, size_t unused_bytes_size,
+ tsi_handshaker_result **handshaker_result) {
+ if ((unused_bytes_size > 0 && unused_bytes == NULL) ||
+ handshaker_result == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ fake_handshaker_result *result = gpr_zalloc(sizeof(*result));
+ result->base.vtable = &handshaker_result_vtable;
+ if (unused_bytes_size > 0) {
+ result->unused_bytes = gpr_malloc(unused_bytes_size);
+ memcpy(result->unused_bytes, unused_bytes, unused_bytes_size);
+ }
+ result->unused_bytes_size = unused_bytes_size;
+ *handshaker_result = &result->base;
+ return TSI_OK;
+}
+
/* --- tsi_handshaker methods implementation. ---*/
static tsi_result fake_handshaker_get_bytes_to_send_to_peer(
@@ -370,13 +439,13 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer(
*bytes_size = 0;
return TSI_OK;
}
- if (!impl->outgoing.needs_draining) {
+ if (!impl->outgoing_frame.needs_draining) {
tsi_fake_handshake_message next_message_to_send =
impl->next_message_to_send + 2;
const char *msg_string =
tsi_fake_handshake_message_to_string(impl->next_message_to_send);
- result = bytes_to_frame((unsigned char *)msg_string, strlen(msg_string),
- &impl->outgoing);
+ result = tsi_fake_frame_set_data((unsigned char *)msg_string,
+ strlen(msg_string), &impl->outgoing_frame);
if (result != TSI_OK) return result;
if (next_message_to_send > TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
next_message_to_send = TSI_FAKE_HANDSHAKE_MESSAGE_MAX;
@@ -388,7 +457,7 @@ static tsi_result fake_handshaker_get_bytes_to_send_to_peer(
}
impl->next_message_to_send = next_message_to_send;
}
- result = drain_frame_to_bytes(bytes, bytes_size, &impl->outgoing);
+ result = tsi_fake_frame_encode(bytes, bytes_size, &impl->outgoing_frame);
if (result != TSI_OK) return result;
if (!impl->is_client &&
impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
@@ -414,12 +483,12 @@ static tsi_result fake_handshaker_process_bytes_from_peer(
*bytes_size = 0;
return TSI_OK;
}
- result = fill_frame_from_bytes(bytes, bytes_size, &impl->incoming);
+ result = tsi_fake_frame_decode(bytes, bytes_size, &impl->incoming_frame);
if (result != TSI_OK) return result;
/* We now have a complete frame. */
result = tsi_fake_handshake_message_from_string(
- (const char *)impl->incoming.data + TSI_FAKE_FRAME_HEADER_SIZE,
+ (const char *)impl->incoming_frame.data + TSI_FAKE_FRAME_HEADER_SIZE,
&received_msg);
if (result != TSI_OK) {
impl->result = result;
@@ -434,7 +503,7 @@ static tsi_result fake_handshaker_process_bytes_from_peer(
gpr_log(GPR_INFO, "%s received %s.", impl->is_client ? "Client" : "Server",
tsi_fake_handshake_message_to_string(received_msg));
}
- tsi_fake_frame_reset(&impl->incoming, 0 /* needs_draining */);
+ tsi_fake_frame_reset(&impl->incoming_frame, 0 /* needs_draining */);
impl->needs_incoming_message = 0;
if (impl->next_message_to_send == TSI_FAKE_HANDSHAKE_MESSAGE_MAX) {
/* We're done. */
@@ -451,40 +520,86 @@ static tsi_result fake_handshaker_get_result(tsi_handshaker *self) {
return impl->result;
}
-static tsi_result fake_handshaker_extract_peer(tsi_handshaker *self,
- tsi_peer *peer) {
- tsi_result result = tsi_construct_peer(1, peer);
- if (result != TSI_OK) return result;
- result = tsi_construct_string_peer_property_from_cstring(
- TSI_CERTIFICATE_TYPE_PEER_PROPERTY, TSI_FAKE_CERTIFICATE_TYPE,
- &peer->properties[0]);
- if (result != TSI_OK) tsi_peer_destruct(peer);
- return result;
-}
-
-static tsi_result fake_handshaker_create_frame_protector(
- tsi_handshaker *self, size_t *max_protected_frame_size,
- tsi_frame_protector **protector) {
- *protector = tsi_create_fake_protector(max_protected_frame_size);
- if (*protector == NULL) return TSI_OUT_OF_RESOURCES;
- return TSI_OK;
-}
-
static void fake_handshaker_destroy(tsi_handshaker *self) {
tsi_fake_handshaker *impl = (tsi_fake_handshaker *)self;
- tsi_fake_frame_destruct(&impl->incoming);
- tsi_fake_frame_destruct(&impl->outgoing);
+ tsi_fake_frame_destruct(&impl->incoming_frame);
+ tsi_fake_frame_destruct(&impl->outgoing_frame);
+ gpr_free(impl->outgoing_bytes_buffer);
gpr_free(self);
}
+static tsi_result fake_handshaker_next(
+ tsi_handshaker *self, const unsigned char *received_bytes,
+ size_t received_bytes_size, unsigned char **bytes_to_send,
+ size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result,
+ tsi_handshaker_on_next_done_cb cb, void *user_data) {
+ /* Sanity check the arguments. */
+ if ((received_bytes_size > 0 && received_bytes == NULL) ||
+ bytes_to_send == NULL || bytes_to_send_size == NULL ||
+ handshaker_result == NULL) {
+ return TSI_INVALID_ARGUMENT;
+ }
+ tsi_fake_handshaker *handshaker = (tsi_fake_handshaker *)self;
+ tsi_result result = TSI_OK;
+
+ /* Decode and process a handshake frame from the peer. */
+ size_t consumed_bytes_size = received_bytes_size;
+ if (received_bytes_size > 0) {
+ result = fake_handshaker_process_bytes_from_peer(self, received_bytes,
+ &consumed_bytes_size);
+ if (result != TSI_OK) return result;
+ }
+
+ /* Create a handshake message to send to the peer and encode it as a fake
+ * frame. */
+ size_t offset = 0;
+ do {
+ size_t sent_bytes_size = handshaker->outgoing_bytes_buffer_size - offset;
+ result = fake_handshaker_get_bytes_to_send_to_peer(
+ self, handshaker->outgoing_bytes_buffer + offset, &sent_bytes_size);
+ offset += sent_bytes_size;
+ if (result == TSI_INCOMPLETE_DATA) {
+ handshaker->outgoing_bytes_buffer_size *= 2;
+ handshaker->outgoing_bytes_buffer =
+ gpr_realloc(handshaker->outgoing_bytes_buffer,
+ handshaker->outgoing_bytes_buffer_size);
+ }
+ } while (result == TSI_INCOMPLETE_DATA);
+ if (result != TSI_OK) return result;
+ *bytes_to_send = handshaker->outgoing_bytes_buffer;
+ *bytes_to_send_size = offset;
+
+ /* Check if the handshake was completed. */
+ if (fake_handshaker_get_result(self) == TSI_HANDSHAKE_IN_PROGRESS) {
+ *handshaker_result = NULL;
+ } else {
+ /* Calculate the unused bytes. */
+ const unsigned char *unused_bytes = NULL;
+ size_t unused_bytes_size = received_bytes_size - consumed_bytes_size;
+ if (unused_bytes_size > 0) {
+ unused_bytes = received_bytes + consumed_bytes_size;
+ }
+
+ /* Create a handshaker_result containing the unused bytes. */
+ result = fake_handshaker_result_create(unused_bytes, unused_bytes_size,
+ handshaker_result);
+ if (result == TSI_OK) {
+ /* Indicate that the handshake has completed and that a handshaker_result
+ * has been created. */
+ self->handshaker_result_created = true;
+ }
+ }
+ return result;
+}
+
static const tsi_handshaker_vtable handshaker_vtable = {
- fake_handshaker_get_bytes_to_send_to_peer,
- fake_handshaker_process_bytes_from_peer,
- fake_handshaker_get_result,
- fake_handshaker_extract_peer,
- fake_handshaker_create_frame_protector,
+ NULL, /* get_bytes_to_send_to_peer -- deprecated */
+ NULL, /* process_bytes_from_peer -- deprecated */
+ NULL, /* get_result -- deprecated */
+ NULL, /* extract_peer -- deprecated */
+ NULL, /* create_frame_protector -- deprecated */
fake_handshaker_destroy,
- NULL,
+ fake_handshaker_next,
};
tsi_handshaker *tsi_create_fake_handshaker(int is_client) {
@@ -492,6 +607,9 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client) {
impl->base.vtable = &handshaker_vtable;
impl->is_client = is_client;
impl->result = TSI_HANDSHAKE_IN_PROGRESS;
+ impl->outgoing_bytes_buffer_size =
+ TSI_FAKE_HANDSHAKER_OUTGOING_BUFFER_INITIAL_SIZE;
+ impl->outgoing_bytes_buffer = gpr_malloc(impl->outgoing_bytes_buffer_size);
if (is_client) {
impl->needs_incoming_message = 0;
impl->next_message_to_send = TSI_FAKE_CLIENT_INIT;
@@ -502,7 +620,7 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client) {
return &impl->base;
}
-tsi_frame_protector *tsi_create_fake_protector(
+tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size) {
tsi_fake_frame_protector *impl = gpr_zalloc(sizeof(*impl));
impl->max_frame_size = (max_protected_frame_size == NULL)
diff --git a/src/core/tsi/fake_transport_security.h b/src/core/tsi/fake_transport_security.h
index 3d468c477f..934b3cbeb2 100644
--- a/src/core/tsi/fake_transport_security.h
+++ b/src/core/tsi/fake_transport_security.h
@@ -36,7 +36,7 @@ extern "C" {
tsi_handshaker *tsi_create_fake_handshaker(int is_client);
/* Creates a protector directly without going through the handshake phase. */
-tsi_frame_protector *tsi_create_fake_protector(
+tsi_frame_protector *tsi_create_fake_frame_protector(
size_t *max_protected_frame_size);
#ifdef __cplusplus
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 37d4f038b7..1fd65928f9 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes,
GPR_ASSERT(*unprotected_bytes_size <= INT_MAX);
read_from_ssl =
SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size);
- if (read_from_ssl == 0) {
- gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly.");
- return TSI_INTERNAL_ERROR;
- }
- if (read_from_ssl < 0) {
+ if (read_from_ssl <= 0) {
read_from_ssl = SSL_get_error(ssl, read_from_ssl);
switch (read_from_ssl) {
- case SSL_ERROR_WANT_READ:
- /* We need more data to finish the frame. */
+ case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */
+ case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */
*unprotected_bytes_size = 0;
return TSI_OK;
case SSL_ERROR_WANT_WRITE:
diff --git a/src/cpp/README.md b/src/cpp/README.md
index e9ef489a7c..d2896ad96f 100644
--- a/src/cpp/README.md
+++ b/src/cpp/README.md
@@ -47,7 +47,7 @@ below.
# Build from Source
```sh
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
$ make
@@ -60,14 +60,14 @@ You can find out how to build and run our simplest gRPC C++ example in our
[C++ quick start](../../examples/cpp).
For more detailed documentation on using gRPC in C++ , see our main
-documentation site at [grpc.io](http://grpc.io), specifically:
+documentation site at [grpc.io](https://grpc.io), specifically:
-* [Overview](http://www.grpc.io/docs/): An introduction to gRPC with a simple
+* [Overview](https://grpc.io/docs/): An introduction to gRPC with a simple
Hello World example in all our supported languages, including C++.
-* [gRPC Basics - C++](http://www.grpc.io/docs/tutorials/basic/c.html):
+* [gRPC Basics - C++](https://grpc.io/docs/tutorials/basic/c.html):
A tutorial that steps you through creating a simple gRPC C++ example
application.
-* [Asynchronous Basics - C++](http://www.grpc.io/docs/tutorials/async/helloasync-cpp.html):
+* [Asynchronous Basics - C++](https://grpc.io/docs/tutorials/async/helloasync-cpp.html):
A tutorial that shows you how to use gRPC C++'s asynchronous/non-blocking
APIs.
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..038eb32e04 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -76,8 +76,9 @@ grpc::string Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
-Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) {
+internal::Call Channel::CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = NULL;
if (kRegistered) {
@@ -109,10 +110,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
}
grpc_census_call_set_context(c_call, context->census_context());
context->set_call(c_call, shared_from_this());
- return Call(c_call, this, cq);
+ return internal::Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -131,7 +133,7 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
}
namespace {
-class TagSaver final : public CompletionQueueTag {
+class TagSaver final : public internal::CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 66b1ef0e39..e65cb9903f 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -27,9 +27,11 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
- GenericClientAsyncReaderWriter::Create(
+ GenericClientAsyncReaderWriter::internal::Create(
channel_.get(), cq,
- RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
+ internal::RpcMethod(method.c_str(),
+ internal::RpcMethod::BIDI_STREAMING),
+ context, tag));
}
} // namespace grpc
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index f34b0f3d58..000a03277b 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
- auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 815b607032..fc7feb79fe 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -36,11 +36,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service)
: service_(service), method_(nullptr) {
- MethodHandler* handler =
- new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
- method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
- handler);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
AddMethod(method_);
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 09d5cebe98..99d6680c50 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
private:
const DefaultHealthCheckService* const service_;
- RpcServiceMethod* method_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 04abb6fd3e..3bff9999b9 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -86,7 +86,8 @@ class Server::UnimplementedAsyncRequest final
ServerCompletionQueue* const cq_;
};
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse final
: public UnimplementedAsyncResponseOp {
@@ -104,12 +105,12 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
-class ShutdownTag : public CompletionQueueTag {
+class ShutdownTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
};
-class DummyTag : public CompletionQueueTag {
+class DummyTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
*status = true;
@@ -117,15 +118,15 @@ class DummyTag : public CompletionQueueTag {
}
};
-class Server::SyncRequest final : public CompletionQueueTag {
+class Server::SyncRequest final : public internal::CompletionQueueTag {
public:
- SyncRequest(RpcServiceMethod* method, void* tag)
+ SyncRequest(internal::RpcServiceMethod* method, void* tag)
: method_(method),
tag_(tag),
in_flight_(false),
- has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::SERVER_STREAMING),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
@@ -202,14 +203,14 @@ class Server::SyncRequest final : public CompletionQueueTag {
void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
+ method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
cq_.Shutdown();
- CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
/* Ensure the cq_ is shutdown */
@@ -219,15 +220,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
private:
CompletionQueue cq_;
- Call call_;
+ internal::Call call_;
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
};
private:
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
@@ -300,14 +301,15 @@ class Server::SyncRequestThreadManager : public ThreadManager {
// object
}
- void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
sync_requests_.emplace_back(new SyncRequest(method, tag));
}
void AddUnknownSyncMethod() {
if (!sync_requests_.empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ unknown_method_.reset(new internal::RpcServiceMethod(
+ "unknown", internal::RpcMethod::BIDI_STREAMING,
+ new internal::UnknownMethodHandler));
sync_requests_.emplace_back(
new SyncRequest(unknown_method_.get(), nullptr));
}
@@ -344,8 +346,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
CompletionQueue* server_cq_;
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
- std::unique_ptr<RpcServiceMethod> health_check_;
+ std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
+ std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -421,13 +423,13 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
grpc_server* Server::c_server() { return server_; }
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
- RpcServiceMethod* method) {
+ internal::RpcServiceMethod* method) {
switch (method->method_type()) {
- case RpcMethod::NORMAL_RPC:
- case RpcMethod::SERVER_STREAMING:
+ case internal::RpcMethod::NORMAL_RPC:
+ case internal::RpcMethod::SERVER_STREAMING:
return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
- case RpcMethod::CLIENT_STREAMING:
- case RpcMethod::BIDI_STREAMING:
+ case internal::RpcMethod::CLIENT_STREAMING:
+ case internal::RpcMethod::BIDI_STREAMING:
return GRPC_SRM_PAYLOAD_NONE;
}
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
@@ -448,7 +450,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
- RpcServiceMethod* method = it->get();
+ internal::RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method), 0);
@@ -588,7 +590,8 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -599,8 +602,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag, bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
@@ -622,7 +625,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_receive_message_size());
+ internal::Call call(call_, server_, call_cq_,
+ server_->max_receive_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -637,7 +641,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
@@ -651,7 +656,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
@@ -693,7 +698,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
- UnknownMethodHandler::FillOps(request_->context(), this);
+ internal::UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 3a6bca13b3..2e55ffbac4 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -37,7 +37,7 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp final : public CallOpSetInterface {
+class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp()
@@ -120,7 +120,8 @@ ServerContext::ServerContext()
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false),
- compression_level_set_(false) {}
+ compression_level_set_(false),
+ has_pending_ops_(false) {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
: completion_op_(nullptr),
@@ -130,7 +131,8 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false),
- compression_level_set_(false) {
+ compression_level_set_(false),
+ has_pending_ops_(false) {
std::swap(*client_metadata_.arr(), *arr);
client_metadata_.FillMap();
}
@@ -144,7 +146,7 @@ ServerContext::~ServerContext() {
}
}
-void ServerContext::BeginCompletionOp(Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
@@ -153,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
-CompletionQueueTag* ServerContext::GetCompletionOpTag() {
- return static_cast<CompletionQueueTag*>(completion_op_);
+internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
diff --git a/src/csharp/README.md b/src/csharp/README.md
index a973d2e597..6821ad225e 100644
--- a/src/csharp/README.md
+++ b/src/csharp/README.md
@@ -80,6 +80,6 @@ THE NATIVE DEPENDENCY
Internally, gRPC C# uses a native library written in C (gRPC C core) and invokes its functionality via P/Invoke. The fact that a native library is used should be fully transparent to the users and just installing the `Grpc.Core` NuGet package is the only step needed to use gRPC C# on all supported platforms.
-[API Reference]: http://www.grpc.io/grpc/csharp/
+[API Reference]: https://grpc.io/grpc/csharp/
[Helloworld Example]: ../../examples/csharp/helloworld
-[RouteGuide Tutorial]: http://www.grpc.io/docs/tutorials/basic/csharp.html
+[RouteGuide Tutorial]: https://grpc.io/docs/tutorials/basic/csharp.html
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index e917917359..71e6904008 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -383,7 +383,10 @@ class ClientStatusOp : public Op {
public:
ClientStatusOp() { grpc_metadata_array_init(&metadata_array); }
- ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); }
+ ~ClientStatusOp() {
+ grpc_metadata_array_destroy(&metadata_array);
+ grpc_slice_unref(status_details);
+ }
bool ParseOp(Local<Value> value, grpc_op *out) {
out->data.recv_status_on_client.trailing_metadata = &metadata_array;
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 6eb5f99cb7..edc51b7802 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -137,6 +137,7 @@ function _write(chunk, encoding, callback) {
/* Once a write fails, just call the callback immediately to let the caller
flush any pending writes. */
setImmediate(callback);
+ return;
}
try {
message = this.serialize(chunk);
@@ -149,6 +150,7 @@ function _write(chunk, encoding, callback) {
this.call.cancelWithStatus(constants.status.INTERNAL,
'Serialization failure');
callback(e);
+ return;
}
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js
index f1e4914fe2..0f07251677 100644
--- a/src/node/src/protobuf_js_6_common.js
+++ b/src/node/src/protobuf_js_6_common.js
@@ -49,7 +49,7 @@ exports.deserializeCls = function deserializeCls(cls, options) {
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
- return cls.decode(arg_buf).toObject(conversion_options);
+ return cls.toObject(cls.decode(arg_buf), conversion_options);
};
};
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index d58d18057e..0b0b393e32 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1398,13 +1398,25 @@ describe('Client reconnect', function() {
});
server.bind('localhost:' + port, server_insecure_creds);
server.start();
- client.echo(undefined, function(error, response) {
- if (error) {
- console.log(error);
- }
+
+ /* We create a new client, that will not throw an error if the server
+ * is not immediately available. Instead, it will wait for the server
+ * to be available, then the call will complete. Once this happens, the
+ * original client should be able to make a new call and connect to the
+ * restarted server without having the call fail due to connection
+ * errors. */
+ var client2 = new Client('localhost:' + port,
+ grpc.credentials.createInsecure());
+ client2.echo({value: 'test', value2: 3}, function(error, response) {
assert.ifError(error);
- assert.deepEqual(response, {value: '', value2: 0});
- done();
+ client.echo(undefined, function(error, response) {
+ if (error) {
+ console.log(error);
+ }
+ assert.ifError(error);
+ assert.deepEqual(response, {value: '', value2: 0});
+ done();
+ });
});
});
});
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index 542d52d48b..0a3c32734c 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -3,7 +3,7 @@
"version": "1.5.0-dev",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
- "homepage": "http://www.grpc.io/",
+ "homepage": "https://grpc.io/",
"repository": {
"type": "git",
"url": "https://github.com/grpc/grpc.git"
diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
index 351a45df58..22527d1572 100644
--- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
+++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
@@ -51,7 +51,7 @@ Pod::Spec.new do |s|
The generated code will have a dependency on the gRPC Objective-C Proto runtime of the same
version. The runtime can be obtained as the "gRPC-ProtoRPC" pod.
DESC
- s.homepage = 'http://www.grpc.io'
+ s.homepage = 'https://grpc.io'
s.license = {
:type => 'Apache License, Version 2.0',
:text => <<-LICENSE
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index bd7d4ad691..a871ea895a 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -27,8 +27,8 @@
* immediately, unless flow control prevents it.
* If it is throttled and keeps receiving values, as well as if it receives values before being
* started, it will buffer them and propagate them in order as soon as its state becomes Started.
- * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
- * propagate the error immediately.
+ * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the
+ * last buffered value and issue it to the writeable after all buffered values are issued.
*
* Beware that a pipe of this type can't prevent receiving more values when it is paused (for
* example if used to write data to a congested network connection). Because in such situations the
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index e4a7cc40f9..99cb0ad971 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -18,11 +18,13 @@
#import "GRXBufferedPipe.h"
+@interface GRXBufferedPipe ()
+@property(atomic) id<GRXWriteable> writeable;
+@end
+
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- NSMutableArray *_queue;
- BOOL _inputIsFinished;
NSError *_errorOrNil;
+ dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@@ -33,99 +35,79 @@
- (instancetype)init {
if (self = [super init]) {
- _queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
+ _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ dispatch_suspend(_writeQueue);
}
return self;
}
-- (id)popValue {
- id value = _queue[0];
- [_queue removeObjectAtIndex:0];
- return value;
-}
-
-- (void)writeBufferUntilPausedOrStopped {
- while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable writeValue:[self popValue]];
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
-}
-
#pragma mark GRXWriteable implementation
-// Returns whether events can be simply propagated to the other end of the pipe.
-- (BOOL)shouldFastForward {
- return _state == GRXWriterStateStarted && _queue.count == 0;
-}
-
- (void)writeValue:(id)value {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
+ if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- if ([value respondsToSelector:@selector(copy)]) {
- value = [value copy];
- }
- [_queue addObject:value];
+ value = [value copy];
}
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^(void) {
+ [weakSelf.writeable writeValue:value];
+ });
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _inputIsFinished = YES;
- _errorOrNil = errorOrNil;
- if (errorOrNil || self.shouldFastForward) {
- // No need to write pending values.
- [self finishWithError:_errorOrNil];
- }
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ [weakSelf finishWithError:errorOrNil];
+ });
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
-
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- _queue = nil;
- // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
- // writeable to be messaged anymore.
- _writeable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
+ @synchronized (self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ self.writeable = nil;
+ if (_state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
_state = newState;
- [self writeBufferUntilPausedOrStopped];
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ return;
+ case GRXWriterStatePaused:
+ if (_state == GRXWriterStateStarted) {
+ _state = newState;
+ dispatch_suspend(_writeQueue);
+ }
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ dispatch_resume(_writeQueue);
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ self.writeable = writeable;
_state = GRXWriterStateStarted;
- _writeable = writeable;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ [self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
}
@end
diff --git a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec
index 22cd7e1c60..cd9464c453 100644
--- a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec
+++ b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec
@@ -3,7 +3,7 @@ Pod::Spec.new do |s|
s.version = '0.0.1'
s.license = 'Apache License, Version 2.0'
s.authors = { 'gRPC contributors' => 'grpc-io@googlegroups.com' }
- s.homepage = 'http://www.grpc.io/'
+ s.homepage = 'https://grpc.io/'
s.summary = 'RemoteTest example'
s.source = { :git => 'https://github.com/grpc/grpc.git' }
diff --git a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj
index afc3da7116..6247d0b4e6 100644
--- a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj
+++ b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj
@@ -275,6 +275,7 @@
ONLY_ACTIVE_ARCH = YES;
SDKROOT = iphoneos;
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
+ SWIFT_VERSION = 2.3;
TARGETED_DEVICE_FAMILY = "1,2";
};
name = Debug;
@@ -312,6 +313,7 @@
IPHONEOS_DEPLOYMENT_TARGET = 8.4;
MTL_ENABLE_DEBUG_INFO = NO;
SDKROOT = iphoneos;
+ SWIFT_VERSION = 2.3;
TARGETED_DEVICE_FAMILY = "1,2";
VALIDATE_PRODUCT = YES;
};
@@ -327,6 +329,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
SWIFT_OBJC_BRIDGING_HEADER = "";
+ SWIFT_VERSION = 2.3;
USER_HEADER_SEARCH_PATHS = "";
};
name = Debug;
@@ -341,6 +344,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
SWIFT_OBJC_BRIDGING_HEADER = "";
+ SWIFT_VERSION = 2.3;
USER_HEADER_SEARCH_PATHS = "";
};
name = Release;
diff --git a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
index 7fbf6371b3..1796c6d746 100644
--- a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
+++ b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec
@@ -3,7 +3,7 @@ Pod::Spec.new do |s|
s.version = "0.0.1"
s.license = "Apache License, Version 2.0"
s.authors = { 'gRPC contributors' => 'grpc-io@googlegroups.com' }
- s.homepage = "http://www.grpc.io/"
+ s.homepage = "https://grpc.io/"
s.summary = "RemoteTest example"
s.source = { :git => 'https://github.com/grpc/grpc.git' }
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index f152452b01..fa3ded4c0c 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -23,6 +23,8 @@
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter.h>
+#define TEST_TIMEOUT 1
+
// A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and
// what were the last values passed to it.
//
@@ -140,26 +142,38 @@
#pragma mark BufferedPipe
- (void)testBufferedPipePropagatesValue {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
+
id anyValue = @7;
// If:
GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
[pipe startWithWriteable:writeable];
[pipe writeValue:anyValue];
+ [pipe writesFinishedWithError:nil];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
+
}
- (void)testBufferedPipePropagatesError {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
@@ -168,15 +182,20 @@
[pipe writesFinishedWithError:anyError];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, nil);
XCTAssertEqualObjects(handler.errorOrNil, anyError);
}
- (void)testBufferedPipeFinishWriteWhilePaused {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
id anyValue = @7;
// If:
@@ -188,6 +207,7 @@
[pipe startWithWriteable:writeable];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
diff --git a/src/php/README.md b/src/php/README.md
index 90c8cb386a..11f99e134c 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -100,7 +100,7 @@ the `composer` and `protoc` binaries. You can find out how to get these
Clone this repository
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
```
Build and install the gRPC C core library
@@ -129,7 +129,7 @@ $ sudo make install
You will need the source code to run tests
```sh
-$ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc
+$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git pull --recurse-submodules && git submodule update --init --recursive
```
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 8cc3f55ac1..48016642f7 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -152,7 +152,6 @@ message ServerConfig {
// Buffer pool size (no buffer pool specified if unset)
int32 resource_quota_size = 1001;
-
repeated ChannelArg channel_args = 1002;
}
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index 28a2714568..f047243f82 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -46,7 +46,7 @@ package named :code:`python-dev`).
::
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
- $ git clone -b $(curl -L http://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
+ $ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc $REPO_ROOT
$ cd $REPO_ROOT
$ git submodule update --init
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index a8c69720fd..5950bfa0e6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -518,7 +518,6 @@ cdef extern from "grpc/compression.h":
ctypedef struct grpc_compression_options:
uint32_t enabled_algorithms_bitset
- grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse(
grpc_slice value, grpc_compression_algorithm *algorithm) nogil
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 5819a624f7..ea5bdbae58 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -39,6 +39,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/log_windows.c',
'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
+ 'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
'src/core/lib/support/string_util_windows.c',
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index fc02a24c87..83e0ead391 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -46,7 +46,7 @@ setuptools.setup(
description='Standard Health Checking Service for gRPC',
author='The gRPC Authors',
author_email='grpc-io@googlegroups.com',
- url='http://www.grpc.io',
+ url='https://grpc.io',
license='Apache License 2.0',
package_dir=PACKAGE_DIRECTORIES,
packages=setuptools.find_packages('.'),
diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py
index 920f3b2b48..20edbc4ec0 100644
--- a/src/python/grpcio_reflection/setup.py
+++ b/src/python/grpcio_reflection/setup.py
@@ -48,7 +48,7 @@ setuptools.setup(
description='Standard Protobuf Reflection Service for gRPC',
author='The gRPC Authors',
author_email='grpc-io@googlegroups.com',
- url='http://www.grpc.io',
+ url='https://grpc.io',
package_dir=PACKAGE_DIRECTORIES,
packages=setuptools.find_packages('.'),
install_requires=INSTALL_REQUIRES,
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 266d6b2c16..5c7dae654a 100644
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -68,5 +68,5 @@ Directory structure is the layout for [ruby extensions][]
[ruby extensions]:http://guides.rubygems.org/gems-with-extensions/
[rubydoc]: http://www.rubydoc.info/gems/grpc
-[grpc.io]: http://www.grpc.io/docs/quickstart/ruby.html
+[grpc.io]: https://grpc.io/docs/quickstart/ruby.html
[Debian jessie-backports]:http://backports.debian.org/Instructions/