diff options
72 files changed, 1223 insertions, 422 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c2a609f84..ca0a668f58 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -567,6 +567,9 @@ add_dependencies(buildtests_cxx alarm_cpp_test) add_dependencies(buildtests_cxx async_end2end_test) add_dependencies(buildtests_cxx auth_property_iterator_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_dependencies(buildtests_cxx bm_call_create) +endif() +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_closure) endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -7360,6 +7363,44 @@ endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +add_executable(bm_call_create + test/cpp/microbenchmarks/bm_call_create.cc + third_party/googletest/src/gtest-all.cc +) + + +target_include_directories(bm_call_create + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE third_party/googletest/include + PRIVATE third_party/googletest + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(bm_call_create + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + benchmark + grpc++_test_util + grpc_test_util + grpc++ + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif() +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_executable(bm_closure test/cpp/microbenchmarks/bm_closure.cc third_party/googletest/src/gtest-all.cc @@ -1040,6 +1040,7 @@ wakeup_fd_cv_test: $(BINDIR)/$(CONFIG)/wakeup_fd_cv_test alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test +bm_call_create: $(BINDIR)/$(CONFIG)/bm_call_create bm_closure: $(BINDIR)/$(CONFIG)/bm_closure bm_cq: $(BINDIR)/$(CONFIG)/bm_cq bm_fullstack: $(BINDIR)/$(CONFIG)/bm_fullstack @@ -1446,6 +1447,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/alarm_cpp_test \ $(BINDIR)/$(CONFIG)/async_end2end_test \ $(BINDIR)/$(CONFIG)/auth_property_iterator_test \ + $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_closure \ $(BINDIR)/$(CONFIG)/bm_cq \ $(BINDIR)/$(CONFIG)/bm_fullstack \ @@ -1553,6 +1555,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/alarm_cpp_test \ $(BINDIR)/$(CONFIG)/async_end2end_test \ $(BINDIR)/$(CONFIG)/auth_property_iterator_test \ + $(BINDIR)/$(CONFIG)/bm_call_create \ $(BINDIR)/$(CONFIG)/bm_closure \ $(BINDIR)/$(CONFIG)/bm_cq \ $(BINDIR)/$(CONFIG)/bm_fullstack \ @@ -1871,6 +1874,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing auth_property_iterator_test" $(Q) $(BINDIR)/$(CONFIG)/auth_property_iterator_test || ( echo test auth_property_iterator_test failed ; exit 1 ) + $(E) "[RUN] Testing bm_call_create" + $(Q) $(BINDIR)/$(CONFIG)/bm_call_create || ( echo test bm_call_create failed ; exit 1 ) $(E) "[RUN] Testing bm_closure" $(Q) $(BINDIR)/$(CONFIG)/bm_closure || ( echo test bm_closure failed ; exit 1 ) $(E) "[RUN] Testing bm_cq" @@ -12292,6 +12297,49 @@ endif endif +BM_CALL_CREATE_SRC = \ + test/cpp/microbenchmarks/bm_call_create.cc \ + +BM_CALL_CREATE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BM_CALL_CREATE_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/bm_call_create: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/bm_call_create: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/bm_call_create: $(PROTOBUF_DEP) $(BM_CALL_CREATE_OBJS) $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(BM_CALL_CREATE_OBJS) $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/bm_call_create + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/microbenchmarks/bm_call_create.o: $(LIBDIR)/$(CONFIG)/libbenchmark.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_bm_call_create: $(BM_CALL_CREATE_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BM_CALL_CREATE_OBJS:.o=.dep) +endif +endif + + BM_CLOSURE_SRC = \ test/cpp/microbenchmarks/bm_closure.cc \ diff --git a/build.yaml b/build.yaml index 17a47e7f2f..55f011129a 100644 --- a/build.yaml +++ b/build.yaml @@ -2961,6 +2961,25 @@ targets: - grpc - gpr_test_util - gpr +- name: bm_call_create + build: test + language: c++ + src: + - test/cpp/microbenchmarks/bm_call_create.cc + deps: + - benchmark + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + args: + - --benchmark_min_time=0 + platforms: + - mac + - linux + - posix - name: bm_closure build: test language: c++ diff --git a/doc/service_config.md b/doc/service_config.md index 2dabb83a37..ecc23817d1 100644 --- a/doc/service_config.md +++ b/doc/service_config.md @@ -12,105 +12,105 @@ The service config is a JSON string of the following form: ``` { - # Load balancing policy name. - # Supported values are 'round_robin' and 'grpclb'. - # Optional; if unset, the default behavior is pick the first available - # backend. - # Note that if the resolver returns only balancer addresses and no - # backend addresses, gRPC will always use the 'grpclb' policy, - # regardless of what this field is set to. + // Load balancing policy name. + // Supported values are 'round_robin' and 'grpclb'. + // Optional; if unset, the default behavior is pick the first available + // backend. + // Note that if the resolver returns only balancer addresses and no + // backend addresses, gRPC will always use the 'grpclb' policy, + // regardless of what this field is set to. 'loadBalancingPolicy': string, - # Per-method configuration. Optional. + // Per-method configuration. Optional. 'methodConfig': [ { - # The names of the methods to which this method config applies. There - # must be at least one name. Each name entry must be unique across the - # entire service config. If the 'method' field is empty, then this - # method config specifies the defaults for all methods for the specified - # service. - # - # For example, let's say that the service config contains the following - # method config entries: - # - # 'methodConfig': [ - # { 'name': [ { 'service': 'MyService' } ] ... }, - # { 'name': [ { 'service': 'MyService', 'method': 'Foo' } ] ... } - # ] - # - # For a request for MyService/Foo, we will use the second entry, because - # it exactly matches the service and method name. - # For a request for MyService/Bar, we will use the first entry, because - # it provides the default for all methods of MyService. + // The names of the methods to which this method config applies. There + // must be at least one name. Each name entry must be unique across the + // entire service config. If the 'method' field is empty, then this + // method config specifies the defaults for all methods for the specified + // service. + // + // For example, let's say that the service config contains the following + // method config entries: + // + // 'methodConfig': [ + // { 'name': [ { 'service': 'MyService' } ] ... }, + // { 'name': [ { 'service': 'MyService', 'method': 'Foo' } ] ... } + // ] + // + // For a request for MyService/Foo, we will use the second entry, because + // it exactly matches the service and method name. + // For a request for MyService/Bar, we will use the first entry, because + // it provides the default for all methods of MyService. 'name': [ { - # RPC service name. Required. - # If using gRPC with protobuf as the IDL, then this will be of - # the form "pkg.service_name", where "pkg" is the package name - # defined in the proto file. + // RPC service name. Required. + // If using gRPC with protobuf as the IDL, then this will be of + // the form "pkg.service_name", where "pkg" is the package name + // defined in the proto file. 'service': string, - # RPC method name. Optional (see above). + // RPC method name. Optional (see above). 'method': string, } ], - # Whether RPCs sent to this method should wait until the connection is - # ready by default. If false, the RPC will abort immediately if there - # is a transient failure connecting to the server. Otherwise, gRPC will - # attempt to connect until the deadline is exceeded. - # - # The value specified via the gRPC client API will override the value - # set here. However, note that setting the value in the client API will - # also affect transient errors encountered during name resolution, - # which cannot be caught by the value here, since the service config - # is obtained by the gRPC client via name resolution. + // Whether RPCs sent to this method should wait until the connection is + // ready by default. If false, the RPC will abort immediately if there + // is a transient failure connecting to the server. Otherwise, gRPC will + // attempt to connect until the deadline is exceeded. + // + // The value specified via the gRPC client API will override the value + // set here. However, note that setting the value in the client API will + // also affect transient errors encountered during name resolution, + // which cannot be caught by the value here, since the service config + // is obtained by the gRPC client via name resolution. 'waitForReady': bool, - # The default timeout in seconds for RPCs sent to this method. This can - # be overridden in code. If no reply is received in the specified amount - # of time, the request is aborted and a deadline-exceeded error status - # is returned to the caller. - # - # The actual deadline used will be the minimum of the value specified - # here and the value set by the application via the gRPC client API. - # If either one is not set, then the other will be used. - # If neither is set, then the request has no deadline. - # - # The format of the value is that of the 'Duration' type defined here: - # https://developers.google.com/protocol-buffers/docs/proto3#json + // The default timeout in seconds for RPCs sent to this method. This can + // be overridden in code. If no reply is received in the specified amount + // of time, the request is aborted and a deadline-exceeded error status + // is returned to the caller. + // + // The actual deadline used will be the minimum of the value specified + // here and the value set by the application via the gRPC client API. + // If either one is not set, then the other will be used. + // If neither is set, then the request has no deadline. + // + // The format of the value is that of the 'Duration' type defined here: + // https://developers.google.com/protocol-buffers/docs/proto3#json 'timeout': string, - # The maximum allowed payload size for an individual request or object - # in a stream (client->server) in bytes. The size which is measured is - # the serialized, uncompressed payload in bytes. This applies both - # to streaming and non-streaming requests. - # - # The actual value used is the minimum of the value specified here and - # the value set by the application via the gRPC client API. - # If either one is not set, then the other will be used. - # If neither is set, then the built-in default is used. - # - # If a client attempts to send an object larger than this value, it - # will not be sent and the client will see an error. - # Note that 0 is a valid value, meaning that the request message must - # be empty. + // The maximum allowed payload size for an individual request or object + // in a stream (client->server) in bytes. The size which is measured is + // the serialized, uncompressed payload in bytes. This applies both + // to streaming and non-streaming requests. + // + // The actual value used is the minimum of the value specified here and + // the value set by the application via the gRPC client API. + // If either one is not set, then the other will be used. + // If neither is set, then the built-in default is used. + // + // If a client attempts to send an object larger than this value, it + // will not be sent and the client will see an error. + // Note that 0 is a valid value, meaning that the request message must + // be empty. 'maxRequestMessageBytes': number, - # The maximum allowed payload size for an individual response or object - # in a stream (server->client) in bytes. The size which is measured is - # the serialized, uncompressed payload in bytes. This applies both - # to streaming and non-streaming requests. - # - # The actual value used is the minimum of the value specified here and - # the value set by the application via the gRPC client API. - # If either one is not set, then the other will be used. - # If neither is set, then the built-in default is used. - # - # If a server attempts to send an object larger than this value, it - # will not be sent, and the client will see an error. - # Note that 0 is a valid value, meaning that the response message must - # be empty. + // The maximum allowed payload size for an individual response or object + // in a stream (server->client) in bytes. The size which is measured is + // the serialized, uncompressed payload in bytes. This applies both + // to streaming and non-streaming requests. + // + // The actual value used is the minimum of the value specified here and + // the value set by the application via the gRPC client API. + // If either one is not set, then the other will be used. + // If neither is set, then the built-in default is used. + // + // If a server attempts to send an object larger than this value, it + // will not be sent, and the client will see an error. + // Note that 0 is a valid value, meaning that the response message must + // be empty. 'maxResponseMessageBytes': number } ] diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 65cfe1fa90..b80d831557 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -127,7 +127,7 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); @@ -146,7 +146,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 62fb061bcf..6cbc333b83 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -230,7 +230,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, if (w->lb_policy == w->chand->lb_policy) { if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); w->chand->lb_policy = NULL; } @@ -386,11 +386,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } else { if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } @@ -451,7 +452,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { @@ -550,7 +551,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->resolver = grpc_resolver_create( exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string, new_args != NULL ? new_args : args->channel_args, - chand->interested_parties); + chand->interested_parties, chand->combiner); if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { @@ -559,13 +560,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_resolver *resolver = arg; + grpc_resolver_shutdown_locked(exec_ctx, resolver); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); +} + /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); @@ -846,8 +857,9 @@ static bool pick_subchannel_locked( if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -1123,7 +1135,7 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // Initialize data members. @@ -1210,8 +1222,9 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 2ae4fe862e..b1a1faa6c9 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -32,10 +32,13 @@ */ #include "src/core/ext/client_channel/resolver.h" +#include "src/core/lib/iomgr/combiner.h" void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable) { + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner) { resolver->vtable = vtable; + resolver->combiner = GRPC_COMBINER_REF(combiner, "resolver"); gpr_ref_init(&resolver->refs, 1); } @@ -62,20 +65,24 @@ void grpc_resolver_unref(grpc_resolver *resolver, void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { #endif if (gpr_unref(&resolver->refs)) { + grpc_combiner *combiner = resolver->combiner; resolver->vtable->destroy(exec_ctx, resolver); + GRPC_COMBINER_UNREF(exec_ctx, combiner, "resolver"); } } -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { - resolver->vtable->shutdown(exec_ctx, resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->shutdown_locked(exec_ctx, resolver); } -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { - resolver->vtable->channel_saw_error(exec_ctx, resolver); +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error_locked(exec_ctx, resolver); } -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete) { - resolver->vtable->next(exec_ctx, resolver, result, on_complete); +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete); } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 96ece92b9d..bbba424ca5 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -44,14 +44,16 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; struct grpc_resolver { const grpc_resolver_vtable *vtable; gpr_refcount refs; + grpc_combiner *combiner; }; struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); + void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -70,21 +72,30 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); #endif void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable); + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner); -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. - Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver); + Can be used as a hint that re-resolution is desirable soon. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Get the next result from the resolver. Expected to set \a *result with new channel args and then schedule \a on_complete for execution. If resolution is fatally broken, set \a *result to NULL and - schedule \a on_complete. */ -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + schedule \a on_complete. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 3792ddca18..e3cd99ec5a 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -50,6 +50,7 @@ typedef struct grpc_resolver_args { grpc_uri *uri; const grpc_channel_args *args; grpc_pollset_set *pollset_set; + grpc_combiner *combiner; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5110a7cad9..f8e8bc9c39 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target, grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_combiner *combiner) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = @@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; + resolver_args.combiner = combiner; resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index a4606463eb..e2c189cf0c 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); should not be NULL. */ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_combiner *combiner); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index 7fc88db603..c6386a8942 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -100,7 +100,7 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; memset(calld, 0, sizeof(call_data)); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index c08b53ea04..96ac521a91 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -40,6 +40,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/backoff.h" @@ -63,8 +64,6 @@ typedef struct { /** pollset_set to drive the name resolution process */ grpc_pollset_set *interested_parties; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** are we currently resolving? */ bool resolving; /** which version of the result have we published? */ @@ -95,18 +94,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, + dns_next_locked}; -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->have_retry_timer) { grpc_timer_cancel(exec_ctx, &r->retry_timer); } @@ -116,25 +117,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; @@ -144,30 +141,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, } else { dns_maybe_finish_next_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; - gpr_mu_lock(&r->mu); r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_start_resolving_locked(exec_ctx, r); } } - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; grpc_channel_args *result = NULL; - gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { @@ -198,8 +191,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -208,7 +201,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -221,7 +213,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + grpc_closure_create(dns_on_resolved_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)), &r->addresses); } @@ -240,7 +233,6 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } @@ -264,8 +256,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &dns_resolver_vtable); + grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a1365f6465..e7f66649b5 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,8 +59,6 @@ typedef struct { grpc_lb_addresses *addresses; /** channel args */ grpc_channel_args *channel_args; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** have we published? */ bool published; /** pending next completion, or NULL */ @@ -73,48 +72,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r); -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r); -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; + sockaddr_destroy, sockaddr_shutdown_locked, + sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +125,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); grpc_lb_addresses_destroy(exec_ctx, r->addresses); grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); @@ -201,8 +194,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, memset(r, 0, sizeof(*r)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); return &r->base; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d1fab25478..28a3166832 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1114,8 +1114,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); } else { - grpc_chttp2_cancel_stream(exec_ctx, t, s, - GRPC_ERROR_CREATE("Transport closed")); + grpc_chttp2_cancel_stream( + exec_ctx, t, s, + grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); } } else { GPR_ASSERT(s->id != 0); diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index ec973d4e7f..3fb2a60ac7 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -173,7 +173,6 @@ grpc_error *grpc_call_stack_init( grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); - grpc_call_element_args args; size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; @@ -188,13 +187,15 @@ grpc_error *grpc_call_stack_init( /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - args.start_time = start_time; + const grpc_call_element_args args = { + .start_time = start_time, + .call_stack = call_stack, + .server_transport_data = transport_server_data, + .context = context, + .path = path, + .deadline = deadline, + }; for (i = 0; i < count; i++) { - args.call_stack = call_stack; - args.server_transport_data = transport_server_data; - args.context = context; - args.path = path; - args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1cf07d43c2..1e943dc2e5 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -131,7 +131,7 @@ typedef struct { argument. */ grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args); + const grpc_call_element_args *args); void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_polling_entity *pollent); diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 22781c7839..aa41014a21 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -274,7 +274,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 068c61c92a..29796f7ca7 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -83,7 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r = grpc_transport_init_stream( diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index bc9a2effc2..f9668be0fa 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -52,9 +52,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_call_element* elem = arg; grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - deadline_state->timer_pending = false; - gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -66,53 +63,64 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, } // Starts the deadline timer. -static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - // Note: We do not start the timer if there is already a timer - // pending. This should be okay, because this is only called from two - // functions exported by this module: grpc_deadline_state_start(), which - // starts the initial timer, and grpc_deadline_state_reset(), which - // cancels any pre-existing timer before starting a new one. In - // particular, we want to ensure that if grpc_deadline_state_start() - // winds up trying to start the timer after grpc_deadline_state_reset() - // has already done so, we ignore the value from the former. - if (!deadline_state->timer_pending && - gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { - // Take a reference to the call stack, to be owned by the timer. - GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - deadline_state->timer_pending = true; - grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem, - grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, - &deadline_state->timer_callback, - gpr_now(GPR_CLOCK_MONOTONIC)); - } -} static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec deadline) { + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { + return; + } grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - start_timer_if_needed_locked(exec_ctx, elem, deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + grpc_deadline_timer_state cur_state; + grpc_closure* closure = NULL; +retry: + cur_state = + (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); + switch (cur_state) { + case GRPC_DEADLINE_STATE_PENDING: + // Note: We do not start the timer if there is already a timer + return; + case GRPC_DEADLINE_STATE_FINISHED: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_FINISHED, + GRPC_DEADLINE_STATE_PENDING)) { + // If we've already created and destroyed a timer, we always create a + // new closure: we have no other guarantee that the inlined closure is + // not in use (it may hold a pending call to timer_callback) + closure = grpc_closure_create(timer_callback, elem, + grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; + case GRPC_DEADLINE_STATE_INITIAL: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING)) { + closure = + grpc_closure_init(&deadline_state->timer_callback, timer_callback, + elem, grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; + } + GPR_ASSERT(closure); + GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, + gpr_now(GPR_CLOCK_MONOTONIC)); } // Cancels the deadline timer. -static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - if (deadline_state->timer_pending) { - grpc_timer_cancel(exec_ctx, &deadline_state->timer); - deadline_state->timer_pending = false; - } -} static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - gpr_mu_unlock(&deadline_state->timer_mu); + if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED)) { + grpc_timer_cancel(exec_ctx, &deadline_state->timer); + } else { + // timer was either in STATE_INITAL (nothing to cancel) + // OR in STATE_FINISHED (again nothing to cancel) + } } // Callback run when the call is complete. @@ -120,8 +128,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - deadline_state->next_on_complete->cb( - exec_ctx, deadline_state->next_on_complete->cb_arg, error); + grpc_closure_run(exec_ctx, deadline_state->next_on_complete, + GRPC_ERROR_REF(error)); } // Inject our own on_complete callback into op. @@ -138,14 +146,12 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); deadline_state->call_stack = call_stack; - gpr_mu_init(&deadline_state->timer_mu); } void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { grpc_deadline_state* deadline_state = elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); } // Callback and associated state for starting the timer after call stack @@ -187,10 +193,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - gpr_mu_lock(&deadline_state->timer_mu); - cancel_timer_if_needed_locked(exec_ctx, deadline_state); - start_timer_if_needed_locked(exec_ctx, elem, new_deadline); - gpr_mu_unlock(&deadline_state->timer_mu); + cancel_timer_if_needed(exec_ctx, deadline_state); + start_timer_if_needed(exec_ctx, elem, new_deadline); } void grpc_deadline_state_client_start_transport_stream_op( @@ -244,7 +248,7 @@ typedef struct server_call_data { // Constructor for call_data. Used for both client and server filters. static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args) { + const grpc_call_element_args* args) { // Note: size of call data is different between client and server. memset(elem->call_data, 0, elem->filter->sizeof_call_data); grpc_deadline_state_init(exec_ctx, elem, args->call_stack); diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index bd2b84f79e..94717f6bc7 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,16 +35,18 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" +typedef enum grpc_deadline_timer_state { + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED +} grpc_deadline_timer_state; + // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - // Guards access to timer_pending and timer. - gpr_mu timer_mu; - // True if the timer callback is currently pending. - bool timer_pending; - // The deadline timer. + gpr_atm timer_state; grpc_timer timer; grpc_closure timer_callback; // Closure to invoke when the call is complete. diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 49a2a980e0..c031533dd8 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -386,7 +386,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; calld->on_done_recv_initial_metadata = NULL; calld->on_done_recv_trailing_metadata = NULL; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index bb185351a8..a6946ef9f3 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -343,7 +343,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 5e22860cfb..22938c64b5 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -166,7 +166,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, // Constructor for call_data. static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args) { + const grpc_call_element_args* args) { channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 8a5617e7c1..6d638bcbaa 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -180,25 +180,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; timer->deadline = deadline; - timer->triggered = 0; if (!g_initialized) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } + gpr_mu_lock(&shard->mu); + timer->pending = true; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&shard->mu); + /* early out */ return; } - /* TODO(ctiller): check deadline expired */ - - gpr_mu_lock(&shard->mu); grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { @@ -243,9 +243,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); - if (!timer->triggered) { + if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); - timer->triggered = 1; + timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); } else { @@ -296,7 +296,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { } timer = grpc_timer_heap_top(&shard->heap); if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; - timer->triggered = 1; + timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 9d901c7e68..1608dce9fb 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -40,7 +40,7 @@ struct grpc_timer { gpr_timespec deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ - int triggered; + bool pending; struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index fa2cdee964..f28a14405d 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(!timer->triggered); - timer->triggered = 1; + GPR_ASSERT(timer->pending); + timer->pending = 0; grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); @@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer_t *uv_timer; timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } - timer->triggered = 0; + timer->pending = 1; timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); uv_timer = gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); @@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!timer->triggered) { - timer->triggered = 1; + if (timer->pending) { + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h index 13cf8bd4fa..9870cd4a5c 100644 --- a/src/core/lib/iomgr/timer_uv.h +++ b/src/core/lib/iomgr/timer_uv.h @@ -41,7 +41,7 @@ struct grpc_timer { /* This is actually a uv_timer_t*, but we want to keep platform-specific types out of headers */ void *uv_timer; - int triggered; + int pending; }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index b9bbe1b304..a23082a866 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -302,7 +302,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; memset(calld, 0, sizeof(*calld)); return GRPC_ERROR_NONE; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 36e81d6501..14619d97ca 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -197,7 +197,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 48a1e586e1..0fae3effb6 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -101,6 +101,17 @@ typedef struct { grpc_error *error; } received_status; +static gpr_atm pack_received_status(received_status r) { + return r.is_set ? (1 | (gpr_atm)r.error) : 0; +} + +static received_status unpack_received_status(gpr_atm atm) { + return (atm & 1) == 0 + ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE} + : (received_status){.is_set = true, + .error = (grpc_error *)(atm & ~(gpr_atm)1)}; +} + #define MAX_ERRORS_PER_BATCH 3 typedef struct batch_control { @@ -165,8 +176,8 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; - /* Received call statuses from various sources */ - received_status status[STATUS_SOURCE_COUNT]; + /* Packed received call statuses from various sources */ + gpr_atm status[STATUS_SOURCE_COUNT]; /* Call data useful used for reporting. Only valid after the call has * completed */ @@ -446,7 +457,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF(c->status[i].error); + GRPC_ERROR_UNREF( + unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); @@ -614,13 +626,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, */ static bool get_final_status_from( - grpc_call *call, status_source from_source, bool allow_ok_status, + grpc_call *call, grpc_error *error, bool allow_ok_status, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; const char *msg = NULL; - grpc_error_get_status(call->status[from_source].error, call->send_deadline, - &code, &msg, NULL); + grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -638,12 +649,15 @@ static void get_final_status(grpc_call *call, void *user_data), void *set_value_user_data, grpc_slice *details) { int i; + received_status status[STATUS_SOURCE_COUNT]; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); + } if (grpc_call_error_trace) { gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - gpr_log(GPR_DEBUG, " %d: %s", i, - grpc_error_string(call->status[i].error)); + if (status[i].is_set) { + gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error)); } } } @@ -653,9 +667,9 @@ static void get_final_status(grpc_call *call, /* search for the best status we can present: ideally the error we use has a clearly defined grpc-status, and we'll prefer that. */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set && - grpc_error_has_clear_grpc_status(call->status[i].error)) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set && + grpc_error_has_clear_grpc_status(status[i].error)) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -663,8 +677,8 @@ static void get_final_status(grpc_call *call, } /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -681,12 +695,13 @@ static void get_final_status(grpc_call *call, static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { - if (call->status[source].is_set) { + if (!gpr_atm_rel_cas(&call->status[source], + pack_received_status((received_status){ + .is_set = false, .error = GRPC_ERROR_NONE}), + pack_received_status((received_status){ + .is_set = true, .error = error}))) { GRPC_ERROR_UNREF(error); - return; } - call->status[source].is_set = true; - call->status[source].error = error; } /******************************************************************************* diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 48de0e1d5b..49bc4c114b 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -122,7 +122,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; gpr_atm_no_barrier_store(&calld->filled_metadata, 0); return GRPC_ERROR_NONE; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 6ab1c0d94d..7210c69fb0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -879,7 +879,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -1198,7 +1198,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, crm->server_registered_method = rm; crm->flags = rm->flags; crm->has_host = has_host; - crm->host = host; + if (has_host) { + crm->host = host; + } crm->method = method; } GPR_ASSERT(slots <= UINT32_MAX); diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 5f9fd8790b..79c4bab985 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -244,7 +244,7 @@ class CallData { /// Initializes the call data. virtual grpc_error *Init(grpc_exec_ctx *exec_ctx, ChannelData *channel_data, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { return GRPC_ERROR_NONE; } @@ -308,7 +308,7 @@ class ChannelFilter final { static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data; // Construct the object in the already-allocated memory. CallDataType *call_data = new (elem->call_data) CallDataType(); diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index dd9c544524..76bb57346c 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -57,7 +57,7 @@ static grpc_error *channel_init_func(grpc_exec_ctx *exec_ctx, static grpc_error *call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { ++*(int *)(elem->channel_data); *(int *)(elem->call_data) = 0; return GRPC_ERROR_NONE; diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c index cfd37052de..3e3401165c 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c @@ -36,14 +36,17 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include "src/core/ext/client_channel/resolver.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "test/core/util/test_config.h" static gpr_mu g_mu; static bool g_fail_resolution = true; +static grpc_combiner *g_combiner; static grpc_error *my_resolve_address(const char *name, const char *addr, grpc_resolved_addresses **addrs) { @@ -71,6 +74,7 @@ static grpc_resolver *create_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; grpc_resolver *resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &args); grpc_resolver_factory_unref(factory); @@ -96,11 +100,41 @@ static bool wait_loop(int deadline_seconds, gpr_event *ev) { return false; } +typedef struct next_args { + grpc_resolver *resolver; + grpc_channel_args **result; + grpc_closure *on_complete; +} next_args; + +static void call_resolver_next_now_lock_taken(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_unused) { + next_args *a = arg; + grpc_resolver_next_locked(exec_ctx, a->resolver, a->result, a->on_complete); + gpr_free(a); +} + +static void call_resolver_next_after_locking(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + next_args *a = gpr_malloc(sizeof(*a)); + a->resolver = resolver; + a->result = result; + a->on_complete = on_complete; + grpc_closure_sched( + exec_ctx, + grpc_closure_create(call_resolver_next_now_lock_taken, a, + grpc_combiner_scheduler(resolver->combiner, false)), + GRPC_ERROR_NONE); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); gpr_mu_init(&g_mu); + g_combiner = grpc_combiner_create(NULL); grpc_blocking_resolve_address = my_resolve_address; grpc_channel_args *result = (grpc_channel_args *)1; @@ -108,7 +142,7 @@ int main(int argc, char **argv) { grpc_resolver *resolver = create_resolver(&exec_ctx, "dns:test"); gpr_event ev1; gpr_event_init(&ev1); - grpc_resolver_next( + call_resolver_next_after_locking( &exec_ctx, resolver, &result, grpc_closure_create(on_done, &ev1, grpc_schedule_on_exec_ctx)); grpc_exec_ctx_flush(&exec_ctx); @@ -117,7 +151,7 @@ int main(int argc, char **argv) { gpr_event ev2; gpr_event_init(&ev2); - grpc_resolver_next( + call_resolver_next_after_locking( &exec_ctx, resolver, &result, grpc_closure_create(on_done, &ev2, grpc_schedule_on_exec_ctx)); grpc_exec_ctx_flush(&exec_ctx); @@ -126,6 +160,7 @@ int main(int argc, char **argv) { grpc_channel_args_destroy(&exec_ctx, result); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test"); + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); diff --git a/test/core/client_channel/resolvers/dns_resolver_test.c b/test/core/client_channel/resolvers/dns_resolver_test.c index 5603a57b5f..9dd5aed091 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_test.c @@ -36,8 +36,11 @@ #include <grpc/support/log.h> #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/lib/iomgr/combiner.h" #include "test/core/util/test_config.h" +static grpc_combiner *g_combiner; + static void test_succeeds(grpc_resolver_factory *factory, const char *string) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_uri *uri = grpc_uri_parse(string, 0); @@ -48,6 +51,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver != NULL); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); @@ -65,6 +69,7 @@ static void test_fails(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver == NULL); grpc_uri_destroy(uri); @@ -76,6 +81,8 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + g_combiner = grpc_combiner_create(NULL); + dns = grpc_resolver_factory_lookup("dns"); test_succeeds(dns, "dns:10.2.1.1"); @@ -84,6 +91,11 @@ int main(int argc, char **argv) { test_fails(dns, "ipv4://8.8.8.8/8.8.8.8:8888"); grpc_resolver_factory_unref(dns); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); + grpc_exec_ctx_finish(&exec_ctx); + } grpc_shutdown(); return 0; diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.c b/test/core/client_channel/resolvers/sockaddr_resolver_test.c index 10df78537c..68831ab7c7 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.c +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.c @@ -39,9 +39,12 @@ #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "test/core/util/test_config.h" +static grpc_combiner *g_combiner; + typedef struct on_resolution_arg { char *expected_server_name; grpc_channel_args *resolver_result; @@ -62,6 +65,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver != NULL); @@ -71,8 +75,8 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { grpc_closure *on_resolution = grpc_closure_create( on_resolution_cb, &on_res_arg, grpc_schedule_on_exec_ctx); - grpc_resolver_next(&exec_ctx, resolver, &on_res_arg.resolver_result, - on_resolution); + grpc_resolver_next_locked(&exec_ctx, resolver, &on_res_arg.resolver_result, + on_resolution); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); grpc_exec_ctx_finish(&exec_ctx); grpc_uri_destroy(uri); @@ -88,6 +92,7 @@ static void test_fails(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver == NULL); grpc_uri_destroy(uri); @@ -99,6 +104,8 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + g_combiner = grpc_combiner_create(NULL); + ipv4 = grpc_resolver_factory_lookup("ipv4"); ipv6 = grpc_resolver_factory_lookup("ipv6"); @@ -118,6 +125,12 @@ int main(int argc, char **argv) { grpc_resolver_factory_unref(ipv4); grpc_resolver_factory_unref(ipv6); + + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); + grpc_exec_ctx_finish(&exec_ctx); + } grpc_shutdown(); return 0; diff --git a/test/core/end2end/fake_resolver.c b/test/core/end2end/fake_resolver.c index 4f05f69f01..8a37531449 100644 --- a/test/core/end2end/fake_resolver.c +++ b/test/core/end2end/fake_resolver.c @@ -213,7 +213,7 @@ static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx, r->channel_args = grpc_channel_args_copy(args->args); r->addresses = addresses; gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &fake_resolver_vtable); + grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner); return &r->base; } diff --git a/test/core/end2end/tests/filter_call_init_fails.c b/test/core/end2end/tests/filter_call_init_fails.c index 7e189164b2..d2d6e82d57 100644 --- a/test/core/end2end/tests/filter_call_init_fails.c +++ b/test/core/end2end/tests/filter_call_init_fails.c @@ -205,7 +205,7 @@ static void test_request(grpc_end2end_test_config config) { static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { return grpc_error_set_int(GRPC_ERROR_CREATE("access denied"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_PERMISSION_DENIED); diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 308b4de71b..25e606556d 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -230,7 +230,7 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { return GRPC_ERROR_NONE; } diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c index 13d2ae012c..d05e9e79a1 100644 --- a/test/core/end2end/tests/filter_latency.c +++ b/test/core/end2end/tests/filter_latency.c @@ -260,7 +260,7 @@ static void test_request(grpc_end2end_test_config config) { static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { return GRPC_ERROR_NONE; } diff --git a/test/cpp/common/channel_filter_test.cc b/test/cpp/common/channel_filter_test.cc index 32246a4b76..d78b05e5d8 100644 --- a/test/cpp/common/channel_filter_test.cc +++ b/test/cpp/common/channel_filter_test.cc @@ -55,7 +55,7 @@ class MyCallData : public CallData { MyCallData() {} grpc_error* Init(grpc_exec_ctx* exec_ctx, ChannelData* channel_data, - grpc_call_element_args* args) override { + const grpc_call_element_args* args) override { (void)args->path; // Make sure field is available. return GRPC_ERROR_NONE; } diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc new file mode 100644 index 0000000000..a633b49c65 --- /dev/null +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -0,0 +1,382 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This benchmark exists to ensure that the benchmark integration is + * working */ + +#include <string.h> +#include <sstream> + +#include <grpc++/support/channel_arguments.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +extern "C" { +#include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/load_reporting/load_reporting_filter.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/channel/http_client_filter.h" +#include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/channel/message_size_filter.h" +#include "src/core/lib/transport/transport_impl.h" +} + +#include "third_party/benchmark/include/benchmark/benchmark.h" + +static struct Init { + Init() { grpc_init(); } + ~Init() { grpc_shutdown(); } +} g_init; + +static void BM_InsecureChannelWithDefaults(benchmark::State &state) { + grpc_channel *channel = + grpc_insecure_channel_create("localhost:12345", NULL, NULL); + grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_slice method = grpc_slice_from_static_string("/foo/bar"); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + while (state.KeepRunning()) { + grpc_call_destroy(grpc_channel_create_call(channel, NULL, + GRPC_PROPAGATE_DEFAULTS, cq, + method, NULL, deadline, NULL)); + } + grpc_channel_destroy(channel); + grpc_completion_queue_destroy(cq); +} +BENCHMARK(BM_InsecureChannelWithDefaults); + +static void FilterDestroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + gpr_free(arg); +} + +static void DoNothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +class FakeClientChannelFactory : public grpc_client_channel_factory { + public: + FakeClientChannelFactory() { vtable = &vtable_; } + + private: + static void NoRef(grpc_client_channel_factory *factory) {} + static void NoUnref(grpc_exec_ctx *exec_ctx, + grpc_client_channel_factory *factory) {} + static grpc_subchannel *CreateSubchannel(grpc_exec_ctx *exec_ctx, + grpc_client_channel_factory *factory, + const grpc_subchannel_args *args) { + return nullptr; + } + static grpc_channel *CreateClientChannel(grpc_exec_ctx *exec_ctx, + grpc_client_channel_factory *factory, + const char *target, + grpc_client_channel_type type, + const grpc_channel_args *args) { + return nullptr; + } + + static const grpc_client_channel_factory_vtable vtable_; +}; + +const grpc_client_channel_factory_vtable FakeClientChannelFactory::vtable_ = { + NoRef, NoUnref, CreateSubchannel, CreateClientChannel}; + +static grpc_arg StringArg(const char *key, const char *value) { + grpc_arg a; + a.type = GRPC_ARG_STRING; + a.key = const_cast<char *>(key); + a.value.string = const_cast<char *>(value); + return a; +} + +enum FixtureFlags : uint32_t { + CHECKS_NOT_LAST = 1, + REQUIRES_TRANSPORT = 2, +}; + +template <const grpc_channel_filter *kFilter, uint32_t kFlags> +struct Fixture { + const grpc_channel_filter *filter = kFilter; + const uint32_t flags = kFlags; +}; + +namespace dummy_filter { + +static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) {} + +static void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) {} + +static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) {} + +static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + void *and_free_memory) {} + +grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} + +char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return gpr_strdup("peer"); +} + +void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + +static const grpc_channel_filter dummy_filter = {StartTransportStreamOp, + StartTransportOp, + 0, + InitCallElem, + SetPollsetOrPollsetSet, + DestroyCallElem, + 0, + InitChannelElem, + DestroyChannelElem, + GetPeer, + GetChannelInfo, + "dummy_filter"}; + +} // namespace dummy_filter + +namespace dummy_transport { + +/* Memory required for a single stream element - this is allocated by upper + layers and initialized by the transport */ +size_t sizeof_stream; /* = sizeof(transport stream) */ + +/* name of this transport implementation */ +const char *name; + +/* implementation of grpc_transport_init_stream */ +int InitStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, grpc_stream_refcount *refcount, + const void *server_data) { + return 0; +} + +/* implementation of grpc_transport_set_pollset */ +void SetPollset(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, grpc_pollset *pollset) {} + +/* implementation of grpc_transport_set_pollset */ +void SetPollsetSet(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, grpc_pollset_set *pollset_set) {} + +/* implementation of grpc_transport_perform_stream_op */ +void PerformStreamOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, grpc_transport_stream_op *op) { + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE); +} + +/* implementation of grpc_transport_perform_op */ +void PerformOp(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_transport_op *op) {} + +/* implementation of grpc_transport_destroy_stream */ +void DestroyStream(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, void *and_free_memory) {} + +/* implementation of grpc_transport_destroy */ +void Destroy(grpc_exec_ctx *exec_ctx, grpc_transport *self) {} + +/* implementation of grpc_transport_get_peer */ +char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_transport *self) { + return gpr_strdup("transport_peer"); +} + +/* implementation of grpc_transport_get_endpoint */ +grpc_endpoint *GetEndpoint(grpc_exec_ctx *exec_ctx, grpc_transport *self) { + return nullptr; +} + +static const grpc_transport_vtable dummy_transport_vtable = { + 0, "dummy_http2", InitStream, + SetPollset, SetPollsetSet, PerformStreamOp, + PerformOp, DestroyStream, Destroy, + GetPeer, GetEndpoint}; + +static grpc_transport dummy_transport = {&dummy_transport_vtable}; + +} // namespace dummy_transport + +class NoOp { + public: + class Op { + public: + Op(grpc_exec_ctx *exec_ctx, NoOp *p, grpc_call_stack *s) {} + void Finish(grpc_exec_ctx *exec_ctx) {} + }; +}; + +class SendEmptyMetadata { + public: + SendEmptyMetadata() { + memset(&op_, 0, sizeof(op_)); + op_.on_complete = grpc_closure_init(&closure_, DoNothing, nullptr, + grpc_schedule_on_exec_ctx); + } + + class Op { + public: + Op(grpc_exec_ctx *exec_ctx, SendEmptyMetadata *p, grpc_call_stack *s) { + grpc_metadata_batch_init(&batch_); + p->op_.send_initial_metadata = &batch_; + } + void Finish(grpc_exec_ctx *exec_ctx) { + grpc_metadata_batch_destroy(exec_ctx, &batch_); + } + + private: + grpc_metadata_batch batch_; + }; + + private: + const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC); + const gpr_timespec start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_slice method_ = grpc_slice_from_static_string("/foo/bar"); + grpc_transport_stream_op op_; + grpc_closure closure_; +}; + +// Test a filter in isolation. Fixture specifies the filter under test (use the +// Fixture<> template to specify this), and TestOp defines some unit of work to +// perform on said filter. +template <class Fixture, class TestOp> +static void BM_IsolatedFilter(benchmark::State &state) { + Fixture fixture; + std::ostringstream label; + + std::vector<grpc_arg> args; + FakeClientChannelFactory fake_client_channel_factory; + args.push_back(grpc_client_channel_factory_create_channel_arg( + &fake_client_channel_factory)); + args.push_back(StringArg(GRPC_ARG_SERVER_URI, "localhost")); + + grpc_channel_args channel_args = {args.size(), &args[0]}; + + std::vector<const grpc_channel_filter *> filters; + if (fixture.filter != nullptr) { + filters.push_back(fixture.filter); + } + if (fixture.flags & CHECKS_NOT_LAST) { + filters.push_back(&dummy_filter::dummy_filter); + label << " #has_dummy_filter"; + } + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + size_t channel_size = grpc_channel_stack_size(&filters[0], filters.size()); + grpc_channel_stack *channel_stack = + static_cast<grpc_channel_stack *>(gpr_malloc(channel_size)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "call_stack_init", + grpc_channel_stack_init(&exec_ctx, 1, FilterDestroy, channel_stack, + &filters[0], filters.size(), &channel_args, + fixture.flags & REQUIRES_TRANSPORT + ? &dummy_transport::dummy_transport + : nullptr, + "CHANNEL", channel_stack))); + grpc_exec_ctx_flush(&exec_ctx); + grpc_call_stack *call_stack = static_cast<grpc_call_stack *>( + gpr_malloc(channel_stack->call_stack_size)); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_timespec start_time = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_slice method = grpc_slice_from_static_string("/foo/bar"); + grpc_call_final_info final_info; + TestOp test_op_data; + while (state.KeepRunning()) { + GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1, + DoNothing, NULL, NULL, NULL, method, + start_time, deadline, call_stack)); + typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack); + grpc_call_stack_destroy(&exec_ctx, call_stack, &final_info, NULL); + op.Finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_channel_stack_destroy(&exec_ctx, channel_stack); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(channel_stack); + gpr_free(call_stack); + + state.SetLabel(label.str()); +} + +typedef Fixture<nullptr, 0> NoFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, NoFilter, NoOp); +typedef Fixture<&dummy_filter::dummy_filter, 0> DummyFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, SendEmptyMetadata); +typedef Fixture<&grpc_client_channel_filter, 0> ClientChannelFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp); +typedef Fixture<&grpc_compress_filter, CHECKS_NOT_LAST> CompressFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata); +typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST> + ClientDeadlineFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientDeadlineFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientDeadlineFilter, SendEmptyMetadata); +typedef Fixture<&grpc_server_deadline_filter, CHECKS_NOT_LAST> + ServerDeadlineFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerDeadlineFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerDeadlineFilter, SendEmptyMetadata); +typedef Fixture<&grpc_http_client_filter, CHECKS_NOT_LAST | REQUIRES_TRANSPORT> + HttpClientFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpClientFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpClientFilter, SendEmptyMetadata); +typedef Fixture<&grpc_http_server_filter, CHECKS_NOT_LAST> HttpServerFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata); +typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata); +typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST> + LoadReportingFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata); + +BENCHMARK_MAIN(); diff --git a/tools/internal_ci/linux/grpc_fuzzer_api.cfg b/tools/internal_ci/linux/grpc_fuzzer_api.cfg index a34fb9d47e..5c2592e933 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_api.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_api.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_api.sh b/tools/internal_ci/linux/grpc_fuzzer_api.sh index c3cf1109de..b1f792e765 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_api.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_api.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_client.cfg b/tools/internal_ci/linux/grpc_fuzzer_client.cfg index b1bce02282..1e8f688576 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_client.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_client.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_client.sh b/tools/internal_ci/linux/grpc_fuzzer_client.sh index f9ff13d303..c03f92559c 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_client.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_client.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.cfg b/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.cfg index 215ce2bf9c..72482b62e3 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.sh b/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.sh index d9a73a622b..40179aa547 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_hpack_parser.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_http_request.cfg b/tools/internal_ci/linux/grpc_fuzzer_http_request.cfg index 120e8f8f76..a4a0e8922e 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_http_request.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_http_request.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_http_request.sh b/tools/internal_ci/linux/grpc_fuzzer_http_request.sh index d412d921ba..6dac4ed9ab 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_http_request.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_http_request.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_json.cfg b/tools/internal_ci/linux/grpc_fuzzer_json.cfg index cab4f293ed..d22da2d705 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_json.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_json.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_json.sh b/tools/internal_ci/linux/grpc_fuzzer_json.sh index d9869f6c30..b002ecb262 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_json.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_json.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.cfg b/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.cfg index c73aa819ee..cbf44ba29e 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.sh b/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.sh index 0a7187f8bf..51a8eb58bb 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_nanopb_response.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_server.cfg b/tools/internal_ci/linux/grpc_fuzzer_server.cfg index a1931cb891..7877d51792 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_server.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_server.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_server.sh b/tools/internal_ci/linux/grpc_fuzzer_server.sh index e00e940382..b6648db34d 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_server.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_server.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_uri.cfg b/tools/internal_ci/linux/grpc_fuzzer_uri.cfg index c312ae0464..134b3d06d6 100644 --- a/tools/internal_ci/linux/grpc_fuzzer_uri.cfg +++ b/tools/internal_ci/linux/grpc_fuzzer_uri.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_fuzzer_uri.sh b/tools/internal_ci/linux/grpc_fuzzer_uri.sh index 4137f8061c..e3e46515e2 100755 --- a/tools/internal_ci/linux/grpc_fuzzer_uri.sh +++ b/tools/internal_ci/linux/grpc_fuzzer_uri.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_master.cfg b/tools/internal_ci/linux/grpc_master.cfg index d5901bb601..7536a91a67 100644 --- a/tools/internal_ci/linux/grpc_master.cfg +++ b/tools/internal_ci/linux/grpc_master.cfg @@ -1,5 +1,4 @@ -#!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_master.sh b/tools/internal_ci/linux/grpc_master.sh index ea77d11305..397e1bd713 100755 --- a/tools/internal_ci/linux/grpc_master.sh +++ b/tools/internal_ci/linux/grpc_master.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/tools/internal_ci/linux/grpc_portability_build_only.cfg b/tools/internal_ci/linux/grpc_portability_build_only.cfg new file mode 100644 index 0000000000..ce5be5abe9 --- /dev/null +++ b/tools/internal_ci/linux/grpc_portability_build_only.cfg @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/linux/grpc_portability_build_only.sh" +timeout_mins: 180 +action { + define_artifacts { + regex: "**report**.xml" + } +} diff --git a/tools/internal_ci/linux/grpc_portability_build_only.sh b/tools/internal_ci/linux/grpc_portability_build_only.sh new file mode 100644 index 0000000000..9fac5bcac0 --- /dev/null +++ b/tools/internal_ci/linux/grpc_portability_build_only.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + +git submodule update --init + +tools/jenkins/run_jenkins_matrix.sh -f portability linux --build_only diff --git a/tools/internal_ci/windows/grpc_master.bat b/tools/internal_ci/windows/grpc_master.bat new file mode 100644 index 0000000000..4041c50313 --- /dev/null +++ b/tools/internal_ci/windows/grpc_master.bat @@ -0,0 +1,44 @@ +@rem Copyright 2017, Google Inc. +@rem All rights reserved. +@rem +@rem Redistribution and use in source and binary forms, with or without +@rem modification, are permitted provided that the following conditions are +@rem met: +@rem +@rem * Redistributions of source code must retain the above copyright +@rem notice, this list of conditions and the following disclaimer. +@rem * Redistributions in binary form must reproduce the above +@rem copyright notice, this list of conditions and the following disclaimer +@rem in the documentation and/or other materials provided with the +@rem distribution. +@rem * Neither the name of Google Inc. nor the names of its +@rem contributors may be used to endorse or promote products derived from +@rem this software without specific prior written permission. +@rem +@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +setlocal + +@rem enter repo root +cd /d %~dp0\..\..\.. + +git submodule update --init + +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x c_windows_dbg_sponge_log.xml --report_suite_name c_windows_dbg -l c -c dbg +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x c_windows_opt_sponge_log.xml --report_suite_name c_windows_opt -l c -c opt +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x csharp_windows_dbg_sponge_log.xml --report_suite_name csharp_windows_dbg -l csharp -c dbg +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x csharp_windows_opt_sponge_log.xml --report_suite_name csharp_windows_opt -l csharp -c opt +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x node_windows_dbg_sponge_log.xml --report_suite_name node_windows_dbg -l node -c dbg +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x node_windows_opt_sponge_log.xml --report_suite_name node_windows_opt -l node -c opt +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x python_windows_dbg_sponge_log.xml --report_suite_name python_windows_dbg -l python -c dbg +sh tools\run_tests\helper_scripts\run_tests_in_workspace.sh -t -j 4 -x python_windows_opt_sponge_log.xml --report_suite_name python_windows_opt -l python -c opt diff --git a/tools/internal_ci/windows/grpc_master.cfg b/tools/internal_ci/windows/grpc_master.cfg new file mode 100644 index 0000000000..f90af11308 --- /dev/null +++ b/tools/internal_ci/windows/grpc_master.cfg @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/windows/grpc_master.bat" +timeout_mins: 360 +action { + define_artifacts { + regex: "**sponge_log.xml" + } +} diff --git a/tools/profiling/microbenchmarks/bm2bq.py b/tools/profiling/microbenchmarks/bm2bq.py index 8770fb71cf..b65aebc97f 100755 --- a/tools/profiling/microbenchmarks/bm2bq.py +++ b/tools/profiling/microbenchmarks/bm2bq.py @@ -104,6 +104,10 @@ bm_specs = { 'tpl': [], 'dyn': ['request_size', 'bandwidth_kilobits'], }, + 'BM_IsolatedFilter' : { + 'tpl': ['fixture', 'client_mutator'], + 'dyn': [], + } } def numericalize(s): @@ -160,7 +164,7 @@ def parse_name(name): for bm in js['benchmarks']: context = js['context'] if 'label' in bm: - labels_list = [s.split(':') for s in bm['label'].strip().split(' ')] + labels_list = [s.split(':') for s in bm['label'].strip().split(' ') if len(s) and s[0] != '#'] for el in labels_list: el[0] = el[0].replace('/iter', '_per_iteration') labels = dict(labels_list) diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index ce1868df80..462353cb50 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -2318,6 +2318,26 @@ "gpr", "gpr_test_util", "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "bm_call_create", + "src": [ + "test/cpp/microbenchmarks/bm_call_create.cc" + ], + "third_party": false, + "type": "target" + }, + { + "deps": [ + "benchmark", + "gpr", + "gpr_test_util", + "grpc", "grpc_test_util" ], "headers": [], diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 142e5bea4a..facc1c3659 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2462,6 +2462,28 @@ "flaky": false, "gtest": false, "language": "c++", + "name": "bm_call_create", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, + { + "args": [ + "--benchmark_min_time=0" + ], + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c++", "name": "bm_closure", "platforms": [ "linux", diff --git a/tools/run_tests/python_utils/start_port_server.py b/tools/run_tests/python_utils/start_port_server.py index 8ee7080168..4c9f6aac63 100644 --- a/tools/run_tests/python_utils/start_port_server.py +++ b/tools/run_tests/python_utils/start_port_server.py @@ -30,101 +30,109 @@ from __future__ import print_function from six.moves import urllib +import jobset +import logging import os +import socket import subprocess -import tempfile import sys +import tempfile import time -import jobset -import socket + def start_port_server(port_server_port): - # check if a compatible port server is running - # if incompatible (version mismatch) ==> start a new one - # if not running ==> start a new one - # otherwise, leave it up - try: - version = int(urllib.request.urlopen( - 'http://localhost:%d/version_number' % port_server_port, - timeout=10).read()) - print('detected port server running version %d' % version) - running = True - except Exception as e: - print('failed to detect port server: %s' % sys.exc_info()[0]) - print(e.strerror) - running = False - if running: - current_version = int(subprocess.check_output( - [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), - 'dump_version'])) - print('my port server is version %d' % current_version) - running = (version >= current_version) + # check if a compatible port server is running + # if incompatible (version mismatch) ==> start a new one + # if not running ==> start a new one + # otherwise, leave it up + try: + version = int( + urllib.request.urlopen( + 'http://localhost:%d/version_number' % port_server_port, + timeout=10).read()) + logging.info('detected port server running version %d', version) + running = True + except Exception as e: + logging.exception('failed to detect port server') + running = False + if running: + current_version = int( + subprocess.check_output([ + sys.executable, os.path.abspath( + 'tools/run_tests/python_utils/port_server.py'), + 'dump_version' + ])) + logging.info('my port server is version %d', current_version) + running = (version >= current_version) + if not running: + logging.info('port_server version mismatch: killing the old one') + urllib.request.urlopen('http://localhost:%d/quitquitquit' % + port_server_port).read() + time.sleep(1) if not running: - print('port_server version mismatch: killing the old one') - urllib.request.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read() - time.sleep(1) - if not running: - fd, logfile = tempfile.mkstemp() - os.close(fd) - print('starting port_server, with log file %s' % logfile) - args = [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), - '-p', '%d' % port_server_port, '-l', logfile] - env = dict(os.environ) - env['BUILD_ID'] = 'pleaseDontKillMeJenkins' - if jobset.platform_string() == 'windows': - # Working directory of port server needs to be outside of Jenkins - # workspace to prevent file lock issues. - tempdir = tempfile.mkdtemp() - port_server = subprocess.Popen( - args, - env=env, - cwd=tempdir, - creationflags = 0x00000008, # detached process - close_fds=True) - else: - port_server = subprocess.Popen( - args, - env=env, - preexec_fn=os.setsid, - close_fds=True) - time.sleep(1) - # ensure port server is up - waits = 0 - while True: - if waits > 10: - print('killing port server due to excessive start up waits') - port_server.kill() - if port_server.poll() is not None: - print('port_server failed to start') - # try one final time: maybe another build managed to start one + fd, logfile = tempfile.mkstemp() + os.close(fd) + logging.info('starting port_server, with log file %s', logfile) + args = [ + sys.executable, + os.path.abspath('tools/run_tests/python_utils/port_server.py'), + '-p', '%d' % port_server_port, '-l', logfile + ] + env = dict(os.environ) + env['BUILD_ID'] = 'pleaseDontKillMeJenkins' + if jobset.platform_string() == 'windows': + # Working directory of port server needs to be outside of Jenkins + # workspace to prevent file lock issues. + tempdir = tempfile.mkdtemp() + port_server = subprocess.Popen( + args, + env=env, + cwd=tempdir, + creationflags=0x00000008, # detached process + close_fds=True) + else: + port_server = subprocess.Popen( + args, env=env, preexec_fn=os.setsid, close_fds=True) time.sleep(1) - try: - urllib.request.urlopen('http://localhost:%d/get' % port_server_port, - timeout=1).read() - print('last ditch attempt to contact port server succeeded') - break - except: - traceback.print_exc() - port_log = open(logfile, 'r').read() - print(port_log) - sys.exit(1) - try: - urllib.request.urlopen('http://localhost:%d/get' % port_server_port, + # ensure port server is up + waits = 0 + while True: + if waits > 10: + logging.warning( + 'killing port server due to excessive start up waits') + port_server.kill() + if port_server.poll() is not None: + logging.error('port_server failed to start') + # try one final time: maybe another build managed to start one + time.sleep(1) + try: + urllib.request.urlopen( + 'http://localhost:%d/get' % port_server_port, timeout=1).read() - print('port server is up and ready') - break - except socket.timeout: - print('waiting for port_server: timeout') - traceback.print_exc(); - time.sleep(1) - waits += 1 - except urllib.error.URLError: - print('waiting for port_server: urlerror') - traceback.print_exc(); - time.sleep(1) - waits += 1 - except: - traceback.print_exc() - port_server.kill() - raise - + logging.info( + 'last ditch attempt to contact port server succeeded') + break + except: + logging.exception( + 'final attempt to contact port server failed') + port_log = open(logfile, 'r').read() + print(port_log) + sys.exit(1) + try: + port_server_url = 'http://localhost:%d/get' % port_server_port + urllib.request.urlopen(port_server_url, timeout=1).read() + logging.info('port server is up and ready') + break + except socket.timeout: + logging.exception('while waiting for port_server') + time.sleep(1) + waits += 1 + except urllib.error.URLError: + logging.exception('while waiting for port_server') + time.sleep(1) + waits += 1 + except: + logging.exception('error while contacting port server at "%s".' + 'Will try killing it.', port_server_url) + port_server.kill() + raise diff --git a/tools/run_tests/run_microbenchmark.py b/tools/run_tests/run_microbenchmark.py index a60178d2dd..b090847820 100755 --- a/tools/run_tests/run_microbenchmark.py +++ b/tools/run_tests/run_microbenchmark.py @@ -199,7 +199,7 @@ argp.add_argument('-c', '--collect', default=sorted(collectors.keys()), help='Which collectors should be run against each benchmark') argp.add_argument('-b', '--benchmarks', - default=['bm_fullstack', 'bm_closure', 'bm_cq'], + default=['bm_fullstack', 'bm_closure', 'bm_cq', 'bm_call_create'], nargs='+', type=str, help='Which microbenchmarks should be run') diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index cfc2b04955..2d9eb29e7f 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -38,6 +38,7 @@ import collections import glob import itertools import json +import logging import multiprocessing import os import os.path @@ -84,8 +85,8 @@ def run_shell_command(cmd, env=None, cwd=None): try: subprocess.check_output(cmd, shell=True, env=env, cwd=cwd) except subprocess.CalledProcessError as e: - print("Error while running command '%s'. Exit status %d. Output:\n%s", - e.cmd, e.returncode, e.output) + logging.exception("Error while running command '%s'. Exit status %d. Output:\n%s", + e.cmd, e.returncode, e.output) raise # SimpleConfig: just compile with CONFIG=config, and run the binary to test |