diff options
Diffstat (limited to 'src')
111 files changed, 1539 insertions, 503 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 3adf0fa451..dec8cd0f25 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -104,7 +104,7 @@ grpc::string GetHeaderPrologue(grpc_generator::File* file, grpc::string leading_comments = file->GetLeadingComments("//"); if (!leading_comments.empty()) { printer->Print(vars, "// Original file comments:\n"); - printer->Print(leading_comments.c_str()); + printer->PrintRaw(leading_comments.c_str()); } printer->Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n"); printer->Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n"); diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index e0957bf9bb..40fe0b054a 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -666,7 +666,7 @@ grpc::string GetServices(const FileDescriptor* file, bool generate_client, grpc::string leading_comments = GetCsharpComments(file, true); if (!leading_comments.empty()) { out.Print("// Original file comments:\n"); - out.Print(leading_comments.c_str()); + out.PrintRaw(leading_comments.c_str()); } out.Print("#pragma warning disable 1591\n"); diff --git a/src/compiler/node_generator.cc b/src/compiler/node_generator.cc index ed835b4445..661587cbd6 100644 --- a/src/compiler/node_generator.cc +++ b/src/compiler/node_generator.cc @@ -250,7 +250,7 @@ grpc::string GenerateFile(const FileDescriptor* file) { grpc::string leading_comments = GetNodeComments(file, true); if (!leading_comments.empty()) { out.Print("// Original file comments:\n"); - out.Print(leading_comments.c_str()); + out.PrintRaw(leading_comments.c_str()); } out.Print("'use strict';\n"); diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 349f1dc281..ab7d869758 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -28,6 +28,7 @@ using ::google::protobuf::compiler::objectivec::ClassName; using ::grpc::protobuf::FileDescriptor; +using ::grpc::protobuf::FileDescriptor; using ::grpc::protobuf::MethodDescriptor; using ::grpc::protobuf::ServiceDescriptor; using ::grpc::protobuf::io::Printer; @@ -65,7 +66,7 @@ static void PrintAllComments(const DescriptorType* desc, Printer* printer) { printer->Print(" * "); size_t start_pos = it->find_first_not_of(' '); if (start_pos != grpc::string::npos) { - printer->Print(it->c_str() + start_pos); + printer->PrintRaw(it->c_str() + start_pos); } printer->Print("\n"); } diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h index 2337abaf6a..d3aed76c4f 100644 --- a/src/compiler/objective_c_generator.h +++ b/src/compiler/objective_c_generator.h @@ -24,6 +24,7 @@ namespace grpc_objective_c_generator { using ::grpc::protobuf::FileDescriptor; +using ::grpc::protobuf::FileDescriptor; using ::grpc::protobuf::ServiceDescriptor; using ::grpc::string; diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index 53ff81f94a..d5d488e84d 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -51,12 +51,15 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { { // Generate .pbrpc.h - ::grpc::string imports = ::grpc::string("#import \"") + file_name + - ".pbobjc.h\"\n\n" - "#import <ProtoRPC/ProtoService.h>\n" - "#import <ProtoRPC/ProtoRPC.h>\n" - "#import <RxLibrary/GRXWriteable.h>\n" - "#import <RxLibrary/GRXWriter.h>\n"; + ::grpc::string imports = + ::grpc::string("#if !GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO\n") + + "#import \"" + file_name + + ".pbobjc.h\"\n" + "#endif\n\n" + "#import <ProtoRPC/ProtoService.h>\n" + "#import <ProtoRPC/ProtoRPC.h>\n" + "#import <RxLibrary/GRXWriteable.h>\n" + "#import <RxLibrary/GRXWriter.h>\n"; ::grpc::string proto_imports; proto_imports += "#if GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO\n" + @@ -105,7 +108,10 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { // Generate .pbrpc.m ::grpc::string imports = ::grpc::string("#import \"") + file_name + - ".pbrpc.h\"\n\n" + ".pbrpc.h\"\n" + "#import \"" + + file_name + + ".pbobjc.h\"\n\n" "#import <ProtoRPC/ProtoRPC.h>\n" "#import <RxLibrary/GRXWriter+Immediate.h>\n"; for (int i = 0; i < file->dependency_count(); i++) { diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc index 1ff9520d7f..d9705e8077 100644 --- a/src/compiler/php_generator.cc +++ b/src/compiler/php_generator.cc @@ -164,7 +164,7 @@ grpc::string GenerateFile(const FileDescriptor* file, grpc::string leading_comments = GetPHPComments(file, "//"); if (!leading_comments.empty()) { out.Print("// Original file comments:\n"); - out.Print(leading_comments.c_str()); + out.PrintRaw(leading_comments.c_str()); } map<grpc::string, grpc::string> vars; diff --git a/src/compiler/protobuf_plugin.h b/src/compiler/protobuf_plugin.h index 1551908156..b971af1310 100644 --- a/src/compiler/protobuf_plugin.h +++ b/src/compiler/protobuf_plugin.h @@ -141,6 +141,7 @@ class ProtoBufPrinter : public grpc_generator::Printer { } void Print(const char* string) { printer_.Print(string); } + void PrintRaw(const char* string) { printer_.PrintRaw(string); } void Indent() { printer_.Indent(); } void Outdent() { printer_.Outdent(); } diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index c1ae7d31ad..8a0b889454 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -101,7 +101,7 @@ void PrivateGenerator::PrintAllComments(StringVector comments, ++it) { size_t start_pos = it->find_first_not_of(' '); if (start_pos != grpc::string::npos) { - out->Print(it->c_str() + start_pos); + out->PrintRaw(it->c_str() + start_pos); } out->Print("\n"); } diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc index ed7e074b47..e81dea603b 100644 --- a/src/compiler/ruby_generator.cc +++ b/src/compiler/ruby_generator.cc @@ -174,7 +174,7 @@ grpc::string GetServices(const FileDescriptor* file) { grpc::string leading_comments = GetRubyComments(file, true); if (!leading_comments.empty()) { out.Print("# Original file comments:\n"); - out.Print(leading_comments.c_str()); + out.PrintRaw(leading_comments.c_str()); } out.Print("\n"); diff --git a/src/compiler/schema_interface.h b/src/compiler/schema_interface.h index b2021f2926..c000478e68 100644 --- a/src/compiler/schema_interface.h +++ b/src/compiler/schema_interface.h @@ -86,6 +86,7 @@ struct Printer { virtual void Print(const std::map<grpc::string, grpc::string>& vars, const char* template_string) = 0; virtual void Print(const char* string) = 0; + virtual void PrintRaw(const char* string) = 0; virtual void Indent() = 0; virtual void Outdent() = 0; }; diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index 8c6e7a2182..052b047f43 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -157,7 +157,7 @@ grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, grpc_subchannel* constructed) { grpc_subchannel* c = nullptr; - bool need_to_unref_constructed; + bool need_to_unref_constructed = false; while (c == nullptr) { need_to_unref_constructed = false; diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index eceb5ababa..2b44a714ed 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -126,7 +126,7 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* error) { &chand->close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, @@ -212,7 +212,7 @@ static void channel_connectivity_changed(void* arg, grpc_error* error) { channel_data* chand = (channel_data*)arg; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc index ce83391458..f8946090ac 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc @@ -134,7 +134,7 @@ static bool parse_user_agent(grpc_mdelem md) { bool grpc_objc_specifier_seen = false; bool cronet_specifier_seen = false; char *major_version_str = user_agent_str, *minor_version_str; - long major_version, minor_version; + long major_version = 0, minor_version = 0; char* head = strtok(user_agent_str, " "); while (head != nullptr) { diff --git a/src/core/lib/debug/trace.cc b/src/core/lib/debug/trace.cc index 4c63983bdc..a76c1afb4c 100644 --- a/src/core/lib/debug/trace.cc +++ b/src/core/lib/debug/trace.cc @@ -75,8 +75,8 @@ void TraceFlagList::LogAllTracers() { } // Flags register themselves on the list during construction -TraceFlag::TraceFlag(bool default_enabled, const char* name) - : name_(name), value_(default_enabled) { +TraceFlag::TraceFlag(bool default_enabled, const char* name) : name_(name) { + set_enabled(default_enabled); TraceFlagList::Add(this); } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc new file mode 100644 index 0000000000..a55b3a349a --- /dev/null +++ b/src/core/lib/iomgr/fork_posix.cc @@ -0,0 +1,88 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_FORK + +#include <string.h> + +#include <grpc/fork.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/fork.h" +#include "src/core/lib/support/thd_internal.h" +#include "src/core/lib/surface/init.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { + if (!grpc_fork_support_enabled()) { + gpr_log(GPR_ERROR, + "Fork support not enabled; try running with the " + "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); + return; + } + if (grpc_is_initialized()) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_manager_set_threading(false); + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); + if (!gpr_await_threads( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(3, GPR_TIMESPAN)))) { + gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); + } + } +} + +void grpc_postfork_parent() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_postfork_child() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_fork_handlers_auto_register() { + if (grpc_fork_support_enabled()) { + pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child); + } +} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/fork_windows.cc b/src/core/lib/iomgr/fork_windows.cc new file mode 100644 index 0000000000..f9986f33c7 --- /dev/null +++ b/src/core/lib/iomgr/fork_windows.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifndef GRPC_POSIX_FORK + +#include <grpc/fork.h> +#include <grpc/support/log.h> + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { gpr_log(GPR_ERROR, "Forking not supported on Windows"); } + +void grpc_postfork_parent() {} + +void grpc_postfork_child() {} + +void grpc_fork_handlers_auto_register() {} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 1cc6d98491..9fae8c0052 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -30,6 +30,7 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -59,6 +60,7 @@ #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -90,6 +92,7 @@ #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -103,6 +106,7 @@ #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h new file mode 100644 index 0000000000..0939da595d --- /dev/null +++ b/src/core/lib/support/debug_location.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_DEBUG_LOCATION_H +#define GRPC_CORE_LIB_SUPPORT_DEBUG_LOCATION_H + +namespace grpc_core { + +// Used for tracking file and line where a call is made for debug builds. +// No-op for non-debug builds. +// Callers can use the DEBUG_LOCATION macro in either case. +#ifndef NDEBUG +class DebugLocation { + public: + DebugLocation(const char* file, int line) : file_(file), line_(line) {} + bool Log() const { return true; } + const char* file() const { return file_; } + int line() const { return line_; } + + private: + const char* file_; + const int line_; +}; +#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__) +#else +class DebugLocation { + public: + bool Log() const { return false; } + const char* file() const { return nullptr; } + int line() const { return -1; } +}; +#define DEBUG_LOCATION DebugLocation() +#endif + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_DEBUG_LOCATION_H */ diff --git a/src/core/lib/support/fork.cc b/src/core/lib/support/fork.cc new file mode 100644 index 0000000000..d59ca5584c --- /dev/null +++ b/src/core/lib/support/fork.cc @@ -0,0 +1,62 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/fork.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/support/env.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +static int override_fork_support_enabled = -1; +static int fork_support_enabled; + +void grpc_fork_support_init() { +#ifdef GRPC_ENABLE_FORK_SUPPORT + fork_support_enabled = 1; +#else + fork_support_enabled = 0; + char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); + if (env != NULL) { + static const char* truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) { + fork_support_enabled = 1; + } + } + gpr_free(env); + } +#endif + if (override_fork_support_enabled != -1) { + fork_support_enabled = override_fork_support_enabled; + } +} + +int grpc_fork_support_enabled() { return fork_support_enabled; } + +void grpc_enable_fork_support(int enable) { + override_fork_support_enabled = enable; +} diff --git a/src/core/lib/support/fork.h b/src/core/lib/support/fork.h new file mode 100644 index 0000000000..215d4214a6 --- /dev/null +++ b/src/core/lib/support/fork.h @@ -0,0 +1,35 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_FORK_H +#define GRPC_CORE_LIB_SUPPORT_FORK_H + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_fork_support_init(void); + +int grpc_fork_support_enabled(void); + +// Test only: Must be called before grpc_init(), and overrides +// environment variables/compile flags +void grpc_enable_fork_support(int enable); + +#endif /* GRPC_CORE_LIB_SUPPORT_FORK_H */ diff --git a/src/core/lib/support/murmur_hash.cc b/src/core/lib/support/murmur_hash.cc index 4e08579a1d..2f0e71a53c 100644 --- a/src/core/lib/support/murmur_hash.cc +++ b/src/core/lib/support/murmur_hash.cc @@ -30,22 +30,19 @@ (h) ^= (h) >> 16; uint32_t gpr_murmur_hash3(const void* key, size_t len, uint32_t seed) { - const uint8_t* data = (const uint8_t*)key; - const size_t nblocks = len / 4; - int i; - uint32_t h1 = seed; uint32_t k1; const uint32_t c1 = 0xcc9e2d51; const uint32_t c2 = 0x1b873593; - const uint32_t* blocks = ((const uint32_t*)key) + nblocks; - const uint8_t* tail = (const uint8_t*)(data + nblocks * 4); + const uint8_t* keyptr = (const uint8_t*)key; + const size_t bsize = sizeof(k1); + const size_t nblocks = len / bsize; /* body */ - for (i = -(int)nblocks; i; i++) { - memcpy(&k1, blocks + i, sizeof(uint32_t)); + for (size_t i = 0; i < nblocks; i++, keyptr += bsize) { + memcpy(&k1, keyptr, bsize); k1 *= c1; k1 = ROTL32(k1, 15); @@ -61,13 +58,13 @@ uint32_t gpr_murmur_hash3(const void* key, size_t len, uint32_t seed) { /* tail */ switch (len & 3) { case 3: - k1 ^= ((uint32_t)tail[2]) << 16; + k1 ^= ((uint32_t)keyptr[2]) << 16; /* fallthrough */ case 2: - k1 ^= ((uint32_t)tail[1]) << 8; + k1 ^= ((uint32_t)keyptr[1]) << 8; /* fallthrough */ case 1: - k1 ^= tail[0]; + k1 ^= keyptr[0]; k1 *= c1; k1 = ROTL32(k1, 15); k1 *= c2; diff --git a/src/core/lib/support/ref_counted.h b/src/core/lib/support/ref_counted.h new file mode 100644 index 0000000000..4c662f9119 --- /dev/null +++ b/src/core/lib/support/ref_counted.h @@ -0,0 +1,122 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_REF_COUNTED_H +#define GRPC_CORE_LIB_SUPPORT_REF_COUNTED_H + +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/support/debug_location.h" +#include "src/core/lib/support/memory.h" + +namespace grpc_core { + +// A base class for reference-counted objects. +// New objects should be created via New() and start with a refcount of 1. +// When the refcount reaches 0, the object will be deleted via Delete(). +class RefCounted { + public: + void Ref() { gpr_ref(&refs_); } + + void Unref() { + if (gpr_unref(&refs_)) { + Delete(this); + } + } + + // Not copyable nor movable. + RefCounted(const RefCounted&) = delete; + RefCounted& operator=(const RefCounted&) = delete; + + protected: + // Allow Delete() to access destructor. + template <typename T> + friend void Delete(T*); + + RefCounted() { gpr_ref_init(&refs_, 1); } + + virtual ~RefCounted() {} + + private: + gpr_refcount refs_; +}; + +// An alternative version of the RefCounted base class that +// supports tracing. This is intended to be used in cases where the +// object will be handled both by idiomatic C++ code using smart +// pointers and legacy code that is manually calling Ref() and Unref(). +// Once all of our code is converted to idiomatic C++, we may be able to +// eliminate this class. +class RefCountedWithTracing { + public: + void Ref() { gpr_ref(&refs_); } + + void Ref(const DebugLocation& location, const char* reason) { + if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count); + gpr_log(GPR_DEBUG, "%s:%p %s:%d ref %" PRIdPTR " -> %" PRIdPTR " %s", + trace_flag_->name(), this, location.file(), location.line(), + old_refs, old_refs + 1, reason); + } + Ref(); + } + + void Unref() { + if (gpr_unref(&refs_)) { + Delete(this); + } + } + + void Unref(const DebugLocation& location, const char* reason) { + if (location.Log() && trace_flag_ != nullptr && trace_flag_->enabled()) { + gpr_atm old_refs = gpr_atm_no_barrier_load(&refs_.count); + gpr_log(GPR_DEBUG, "%s:%p %s:%d unref %" PRIdPTR " -> %" PRIdPTR " %s", + trace_flag_->name(), this, location.file(), location.line(), + old_refs, old_refs - 1, reason); + } + Unref(); + } + + // Not copyable nor movable. + RefCountedWithTracing(const RefCountedWithTracing&) = delete; + RefCountedWithTracing& operator=(const RefCountedWithTracing&) = delete; + + protected: + // Allow Delete() to access destructor. + template <typename T> + friend void Delete(T*); + + RefCountedWithTracing() : RefCountedWithTracing(nullptr) {} + + explicit RefCountedWithTracing(TraceFlag* trace_flag) + : trace_flag_(trace_flag) { + gpr_ref_init(&refs_, 1); + } + + virtual ~RefCountedWithTracing() {} + + private: + TraceFlag* trace_flag_ = nullptr; + gpr_refcount refs_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_REF_COUNTED_H */ diff --git a/src/core/lib/support/ref_counted_ptr.h b/src/core/lib/support/ref_counted_ptr.h new file mode 100644 index 0000000000..dc2385e369 --- /dev/null +++ b/src/core/lib/support/ref_counted_ptr.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_REF_COUNTED_PTR_H +#define GRPC_CORE_LIB_SUPPORT_REF_COUNTED_PTR_H + +#include <utility> + +#include "src/core/lib/support/memory.h" + +namespace grpc_core { + +// A smart pointer class for objects that provide Ref() and Unref() methods, +// such as those provided by the RefCounted base class. +template <typename T> +class RefCountedPtr { + public: + RefCountedPtr() {} + + // If value is non-null, we take ownership of a ref to it. + explicit RefCountedPtr(T* value) { value_ = value; } + + // Move support. + RefCountedPtr(RefCountedPtr&& other) { + value_ = other.value_; + other.value_ = nullptr; + } + RefCountedPtr& operator=(RefCountedPtr&& other) { + if (value_ != nullptr) value_->Unref(); + value_ = other.value_; + other.value_ = nullptr; + return *this; + } + + // Copy support. + RefCountedPtr(const RefCountedPtr& other) { + if (other.value_ != nullptr) other.value_->Ref(); + value_ = other.value_; + } + RefCountedPtr& operator=(const RefCountedPtr& other) { + // Note: Order of reffing and unreffing is important here in case value_ + // and other.value_ are the same object. + if (other.value_ != nullptr) other.value_->Ref(); + if (value_ != nullptr) value_->Unref(); + value_ = other.value_; + return *this; + } + + ~RefCountedPtr() { + if (value_ != nullptr) value_->Unref(); + } + + // If value is non-null, we take ownership of a ref to it. + void reset(T* value = nullptr) { + if (value_ != nullptr) value_->Unref(); + value_ = value; + } + + T* get() const { return value_; } + + T& operator*() const { return *value_; } + T* operator->() const { return value_; } + + private: + T* value_ = nullptr; +}; + +template <typename T, typename... Args> +inline RefCountedPtr<T> MakeRefCounted(Args&&... args) { + return RefCountedPtr<T>(New<T>(std::forward<Args>(args)...)); +} + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_REF_COUNTED_PTR_H */ diff --git a/src/core/lib/support/stack_lockfree.cc b/src/core/lib/support/stack_lockfree.cc deleted file mode 100644 index 7a4ede3b92..0000000000 --- a/src/core/lib/support/stack_lockfree.cc +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/support/stack_lockfree.h" - -#include <stdlib.h> -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/port_platform.h> - -/* The lockfree node structure is a single architecture-level - word that allows for an atomic CAS to set it up. */ -struct lockfree_node_contents { - /* next thing to look at. Actual index for head, next index otherwise */ - uint16_t index; -#ifdef GPR_ARCH_64 - uint16_t pad; - uint32_t aba_ctr; -#else -#ifdef GPR_ARCH_32 - uint16_t aba_ctr; -#else -#error Unsupported bit width architecture -#endif -#endif -}; - -/* Use a union to make sure that these are in the same bits as an atm word */ -typedef union lockfree_node { - gpr_atm atm; - struct lockfree_node_contents contents; -} lockfree_node; - -/* make sure that entries aligned to 8-bytes */ -#define ENTRY_ALIGNMENT_BITS 3 -/* reserve this entry as invalid */ -#define INVALID_ENTRY_INDEX ((1 << 16) - 1) - -struct gpr_stack_lockfree { - lockfree_node* entries; - lockfree_node head; /* An atomic entry describing curr head */ -}; - -gpr_stack_lockfree* gpr_stack_lockfree_create(size_t entries) { - gpr_stack_lockfree* stack; - stack = (gpr_stack_lockfree*)gpr_malloc(sizeof(*stack)); - /* Since we only allocate 16 bits to represent an entry number, - * make sure that we are within the desired range */ - /* Reserve the highest entry number as a dummy */ - GPR_ASSERT(entries < INVALID_ENTRY_INDEX); - stack->entries = (lockfree_node*)gpr_malloc_aligned( - entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); - /* Clear out all entries */ - memset(stack->entries, 0, entries * sizeof(stack->entries[0])); - memset(&stack->head, 0, sizeof(stack->head)); - - GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); - - /* Point the head at reserved dummy entry */ - stack->head.contents.index = INVALID_ENTRY_INDEX; -/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ -#ifdef GPR_ARCH_64 - stack->head.contents.pad = 0; -#endif - stack->head.contents.aba_ctr = 0; - return stack; -} - -void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack) { - gpr_free_aligned(stack->entries); - gpr_free(stack); -} - -int gpr_stack_lockfree_push(gpr_stack_lockfree* stack, int entry) { - lockfree_node head; - lockfree_node newhead; - lockfree_node curent; - lockfree_node newent; - - /* First fill in the entry's index and aba ctr for new head */ - newhead.contents.index = (uint16_t)entry; -#ifdef GPR_ARCH_64 - /* Fill in the pad to avoid confusing memcheck tools */ - newhead.contents.pad = 0; -#endif - - /* Also post-increment the aba_ctr */ - curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); - newhead.contents.aba_ctr = ++curent.contents.aba_ctr; - gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); - - do { - /* Atomically get the existing head value for use */ - head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); - /* Point to it */ - newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); - newent.contents.index = head.contents.index; - gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); - } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); - /* Use rel_cas above to make sure that entry index is set properly */ - return head.contents.index == INVALID_ENTRY_INDEX; -} - -int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack) { - lockfree_node head; - lockfree_node newhead; - - do { - head.atm = gpr_atm_acq_load(&(stack->head.atm)); - if (head.contents.index == INVALID_ENTRY_INDEX) { - return -1; - } - newhead.atm = - gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); - - } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); - - return head.contents.index; -} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h deleted file mode 100644 index 337ecc2b17..0000000000 --- a/src/core/lib/support/stack_lockfree.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H -#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H - -#include <stddef.h> - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct gpr_stack_lockfree gpr_stack_lockfree; - -/* This stack must specify the maximum number of entries to track. - The current implementation only allows up to 65534 entries */ -gpr_stack_lockfree* gpr_stack_lockfree_create(size_t entries); -void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); - -/* Pass in a valid entry number for the next stack entry */ -/* Returns 1 if this is the first element on the stack, 0 otherwise */ -int gpr_stack_lockfree_push(gpr_stack_lockfree*, int entry); - -/* Returns -1 on empty or the actual entry number */ -int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack); - -#ifdef __cplusplus -} -#endif - -#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/core/lib/support/thd_internal.h b/src/core/lib/support/thd_internal.h new file mode 100644 index 0000000000..38bffc847d --- /dev/null +++ b/src/core/lib/support/thd_internal.h @@ -0,0 +1,30 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H +#define GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H + +#include <grpc/support/time.h> + +/* Internal interfaces between modules within the gpr support library. */ +void gpr_thd_init(); + +/* Wait for all outstanding threads to finish, up to deadline */ +int gpr_await_threads(gpr_timespec deadline); + +#endif /* GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H */ diff --git a/src/core/lib/support/thd_posix.cc b/src/core/lib/support/thd_posix.cc index 02e3846be1..c2a4f4198f 100644 --- a/src/core/lib/support/thd_posix.cc +++ b/src/core/lib/support/thd_posix.cc @@ -24,22 +24,34 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/useful.h> #include <pthread.h> #include <stdlib.h> #include <string.h> +#include "src/core/lib/support/fork.h" + +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_thread_count; +static int g_awaiting_threads; + struct thd_arg { void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ }; +static void inc_thd_count(); +static void dec_thd_count(); + /* Body of every thread started via gpr_thd_new. */ static void* thread_body(void* v) { struct thd_arg a = *(struct thd_arg*)v; free(v); (*a.body)(a.arg); + dec_thd_count(); return nullptr; } @@ -54,6 +66,7 @@ int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg, GPR_ASSERT(a != nullptr); a->body = thd_body; a->arg = arg; + inc_thd_count(); GPR_ASSERT(pthread_attr_init(&attr) == 0); if (gpr_thd_options_is_detached(options)) { @@ -68,6 +81,7 @@ int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg, if (!thread_started) { /* don't use gpr_free, as this was allocated using malloc (see above) */ free(a); + dec_thd_count(); } *t = (gpr_thd_id)p; return thread_started; @@ -77,4 +91,46 @@ gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, nullptr); } +/***************************************** + * Only used when fork support is enabled + */ + +static void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } +} + +static void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } +} + +void gpr_thd_init() { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + g_thread_count = 0; + g_awaiting_threads = 0; +} + +int gpr_await_threads(gpr_timespec deadline) { + gpr_mu_lock(&g_mu); + g_awaiting_threads = 1; + int res = 0; + if (g_thread_count > 0) { + res = gpr_cv_wait(&g_cv, &g_mu, deadline); + } + g_awaiting_threads = 0; + gpr_mu_unlock(&g_mu); + return res == 0; +} + #endif /* GPR_POSIX_SYNC */ diff --git a/src/core/lib/support/thd_windows.cc b/src/core/lib/support/thd_windows.cc index 5bda7f440c..0875c2f03e 100644 --- a/src/core/lib/support/thd_windows.cc +++ b/src/core/lib/support/thd_windows.cc @@ -50,6 +50,8 @@ static void destroy_thread(struct thd_info* t) { gpr_free(t); } +void gpr_thd_init(void) {} + /* Body of every thread started via gpr_thd_new. */ static DWORD WINAPI thread_body(void* v) { g_thd_info = (struct thd_info*)v; diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index f96d8eba6c..5eb5a56ebb 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -21,6 +21,7 @@ #include <limits.h> #include <memory.h> +#include <grpc/fork.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -39,6 +40,8 @@ #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/fork.h" +#include "src/core/lib/support/thd_internal.h" #include "src/core/lib/surface/alarm_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,10 +65,12 @@ static int g_initializations; static void do_basic_init(void) { gpr_log_verbosity_init(); + grpc_fork_support_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); grpc_cq_global_init(); g_initializations = 0; + grpc_fork_handlers_auto_register(); } static bool append_filter(grpc_channel_stack_builder* builder, void* arg) { @@ -119,6 +124,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + gpr_thd_init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index e9c92b04dc..4f07183180 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -801,7 +801,7 @@ static void channel_connectivity_changed(void* cd, grpc_error* error) { grpc_server* server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->on_connectivity_state_change = &chand->channel_connectivity_changed; op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index f4feadc640..7d36c6c9e1 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -23,4 +23,4 @@ const char* grpc_version_string(void) { return "5.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "generous"; } +const char* grpc_g_stands_for(void) { return "glossy"; } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 2e878b7f10..08aee04ac9 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -100,7 +100,7 @@ grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount, void* buffer, size_t length) { slice_stream_ref(&refcount->slice_refcount); grpc_slice res; - res.refcount = &refcount->slice_refcount, + res.refcount = &refcount->slice_refcount; res.data.refcounted.bytes = (uint8_t*)buffer; res.data.refcounted.length = length; return res; diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index e1e5e895f6..7f01a66dcf 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpc++/grpc++.h> namespace grpc { -grpc::string Version() { return "1.8.0-dev"; } +grpc::string Version() { return "1.9.0-dev"; } } // namespace grpc diff --git a/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs new file mode 100644 index 0000000000..e040f52380 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs @@ -0,0 +1,182 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Profiling; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class CallCancellationTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public async Task ClientStreamingCall_CancelAfterBegin() + { + var barrier = new TaskCompletionSource<object>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + barrier.SetResult(null); + await requestStream.ToListAsync(); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await barrier.Task; // make sure the handler has started. + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() + { + var handlerStartedBarrier = new TaskCompletionSource<object>(); + var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); + var successTcs = new TaskCompletionSource<string>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + handlerStartedBarrier.SetResult(null); + + // wait for cancellation to be delivered. + context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); + await cancelNotificationReceivedBarrier.Task; + + var moveNextResult = await requestStream.MoveNext(); + successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await handlerStartedBarrier.Task; + cts.Cancel(); + + try + { + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + Assert.AreEqual("SUCCESS", await successTcs.Task); + } + + [Test] + public async Task ClientStreamingCall_CancelServerSideRead() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var cts = new CancellationTokenSource(); + var moveNextTask = requestStream.MoveNext(cts.Token); + cts.Cancel(); + await moveNextTask; + return ""; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ServerStreamingCall_CancelClientSideRead() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await responseStream.WriteAsync("abc"); + while (!context.CancellationToken.IsCancellationRequested) + { + await Task.Delay(10); + } + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + await call.ResponseStream.MoveNext(); + Assert.AreEqual("abc", call.ResponseStream.Current); + + var cts = new CancellationTokenSource(); + var moveNextTask = call.ResponseStream.MoveNext(cts.Token); + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await moveNextTask; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 72d9035a6f..90dd365b07 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -273,74 +273,6 @@ namespace Grpc.Core.Tests } [Test] - public async Task ClientStreamingCall_CancelAfterBegin() - { - var barrier = new TaskCompletionSource<object>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - barrier.SetResult(null); - await requestStream.ToListAsync(); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await barrier.Task; // make sure the handler has started. - cts.Cancel(); - - try - { - // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - } - - [Test] - public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() - { - var handlerStartedBarrier = new TaskCompletionSource<object>(); - var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); - var successTcs = new TaskCompletionSource<string>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - handlerStartedBarrier.SetResult(null); - - // wait for cancellation to be delivered. - context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); - await cancelNotificationReceivedBarrier.Task; - - var moveNextResult = await requestStream.MoveNext(); - successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await handlerStartedBarrier.Task; - cts.Cancel(); - - try - { - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - Assert.AreEqual("SUCCESS", await successTcs.Task); - } - - [Test] public async Task AsyncUnaryCall_EchoMetadata() { helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index 1d9475a8b8..775c950c8c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests public void CreateAsyncAndShutdown() { var env = GrpcEnvironment.AddRef(); - var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env)); + var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create())); cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs new file mode 100644 index 0000000000..b6bb0a9eae --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs @@ -0,0 +1,79 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class DefaultObjectPoolTest + { + [Test] + [TestCase(10, 2)] + [TestCase(10, 1)] + [TestCase(0, 2)] + [TestCase(2, 0)] + public void ObjectIsReused(int sharedCapacity, int threadLocalCapacity) + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), sharedCapacity, threadLocalCapacity); + var origLeased = pool.Lease(); + pool.Return(origLeased); + Assert.AreSame(origLeased, pool.Lease()); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void ZeroCapacities() + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 0, 0); + var origLeased = pool.Lease(); + pool.Return(origLeased); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void DisposeCleansSharedPool() + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0); + var origLeased = pool.Lease(); + pool.Return(origLeased); + pool.Dispose(); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void Constructor() + { + Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2)); + Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), -1, 10)); + Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1)); + } + + class TestPooledObject : IDisposable + { + + public void Dispose() + { + + } + } + } +} diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 80031cb7ef..2b1b5e32d7 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -33,6 +33,8 @@ namespace Grpc.Core public class GrpcEnvironment { const int MinDefaultThreadPoolSize = 4; + const int DefaultBatchContextPoolSharedCapacity = 10000; + const int DefaultBatchContextPoolThreadLocalCapacity = 64; static object staticLock = new object(); static GrpcEnvironment instance; @@ -40,11 +42,14 @@ namespace Grpc.Core static int? customThreadPoolSize; static int? customCompletionQueueCount; static bool inlineHandlers; + static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity; + static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>(); static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true); + readonly IObjectPool<BatchContextSafeHandle> batchContextPool; readonly GrpcThreadPool threadPool; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); @@ -186,7 +191,7 @@ namespace Grpc.Core /// <summary> /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -203,7 +208,7 @@ namespace Grpc.Core /// <summary> /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -238,6 +243,26 @@ namespace Grpc.Core } /// <summary> + /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances + /// instead of creating a new one for every C core operation helps reducing the GC pressure. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// This is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); + batchContextPoolSharedCapacity = sharedCapacity; + batchContextPoolThreadLocalCapacity = threadLocalCapacity; + } + } + + /// <summary> /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic. /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first). /// </summary> @@ -249,6 +274,7 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); + batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } @@ -264,6 +290,8 @@ namespace Grpc.Core } } + internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool; + internal bool IsAlive { get @@ -325,6 +353,7 @@ namespace Grpc.Core await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); await threadPool.StopAsync().ConfigureAwait(false); + batchContextPool.Dispose(); GrpcNativeShutdown(); isShutdown = true; diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 42bfbb87e0..3751d549e3 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -41,6 +41,13 @@ namespace Grpc.Core /// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c> /// associated with the call will be cancelled to signal the failure. /// </para> + /// <para> + /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling + /// an individual read operation has the same effect as cancelling the entire call + /// (which will also result in the read operation returning prematurely), but the per-read cancellation + /// tokens passed to MoveNext() only result in cancelling the call if the read operation haven't finished + /// yet. + /// </para> /// </summary> /// <typeparam name="T">The message type.</typeparam> public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index aa2161267a..9946d1a6cf 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -92,23 +92,28 @@ namespace Grpc.Core.Internal } using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - - var ev = cq.Pluck(ctx.Handle); - - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + Logger.Error(e, "Exception occured while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 1e6f1fba37..83385ad7d3 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -38,15 +38,18 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); + IObjectPool<BatchContextSafeHandle> ownedByPool; CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { } - public static BatchContextSafeHandle Create() + public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null) { - return Native.grpcsharp_batch_context_create(); + var ctx = Native.grpcsharp_batch_context_create(); + ctx.ownedByPool = ownedByPool; + return ctx; } public IntPtr Handle @@ -104,6 +107,19 @@ namespace Grpc.Core.Internal return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } + public void Recycle() + { + if (ownedByPool != null) + { + Native.grpcsharp_batch_context_reset(this); + ownedByPool.Return(this); + } + else + { + Dispose(); + } + } + protected override bool ReleaseHandle() { Native.grpcsharp_batch_context_destroy(handle); @@ -123,7 +139,7 @@ namespace Grpc.Core.Internal finally { completionCallbackData = default(CompletionCallbackData); - Dispose(); + Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index d6a5ba586b..a3ef3e61ee 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -70,8 +70,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags) .CheckOk(); } @@ -87,8 +86,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -97,8 +95,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk(); } } @@ -107,8 +104,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -117,8 +113,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } @@ -127,8 +122,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -138,9 +132,8 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); @@ -151,8 +144,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -161,8 +153,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -171,8 +162,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -181,8 +171,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1eeb0e3d97..cd5f8ed92e 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -66,8 +66,7 @@ namespace Grpc.Core.Internal public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState) { - var ctx = BatchContextSafeHandle.Create(); - cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState); + var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 851b6ca213..ab649ee766 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -49,19 +49,19 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); - } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; - if (result == null) - { - await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); - return false; + if (result == null) + { + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); + return false; + } + return true; } - return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index b68655b33c..cf3f3c0995 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -36,13 +36,15 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>(); readonly GrpcEnvironment environment; + readonly Func<BatchContextSafeHandle> batchContextFactory; readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer()); SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing - public CompletionRegistry(GrpcEnvironment environment) + public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory) { - this.environment = environment; + this.environment = GrpcPreconditions.CheckNotNull(environment); + this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory); } public void Register(IntPtr key, IOpCompletionCallback callback) @@ -63,10 +65,12 @@ namespace Grpc.Core.Internal } } - public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state) + public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state) { + var ctx = batchContextFactory(); ctx.SetCompletionCallback(callback, state); Register(ctx.Handle, ctx); + return ctx; } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs new file mode 100644 index 0000000000..2f030f3e02 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -0,0 +1,196 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects that combines a shared pool and a thread local pool. + /// </summary> + internal class DefaultObjectPool<T> : IObjectPool<T> + where T : class, IDisposable + { + readonly object myLock = new object(); + readonly Func<T> itemFactory; + + // Queue shared between threads, access needs to be synchronized. + readonly Queue<T> sharedQueue; + readonly int sharedCapacity; + + readonly ThreadLocal<ThreadLocalData> threadLocalData; + readonly int threadLocalCapacity; + readonly int rentLimit; + + bool disposed; + + /// <summary> + /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity. + /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately + /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected + /// after the thread that owns them has finished). + /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease + /// operations. + /// </summary> + public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity) + { + GrpcPreconditions.CheckArgument(sharedCapacity >= 0); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); + this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); + this.sharedQueue = new Queue<T>(sharedCapacity); + this.sharedCapacity = sharedCapacity; + this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false); + this.threadLocalCapacity = threadLocalCapacity; + this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1; + } + + /// <summary> + /// Leases an item from the pool or creates a new instance if the pool is empty. + /// Attempts to retrieve the item from the thread local pool first. + /// If the thread local pool is empty, the item is taken from the shared pool + /// along with more items that are moved to the thread local pool to avoid + /// prevent acquiring the lock for shared pool too often. + /// The methods should not be called after the pool is disposed, but it won't + /// results in an error to do so (after depleting the items potentially left + /// in the thread local pool, it will continue returning new objects created by the factory). + /// </summary> + public T Lease() + { + var localData = threadLocalData.Value; + if (localData.Queue.Count > 0) + { + return localData.Queue.Dequeue(); + } + if (localData.CreateBudget > 0) + { + localData.CreateBudget --; + return itemFactory(); + } + + int itemsMoved = 0; + T leasedItem = null; + lock(myLock) + { + if (sharedQueue.Count > 0) + { + leasedItem = sharedQueue.Dequeue(); + } + while (sharedQueue.Count > 0 && itemsMoved < rentLimit) + { + localData.Queue.Enqueue(sharedQueue.Dequeue()); + itemsMoved ++; + } + } + + // If the shared pool didn't contain all rentLimit items, + // next time we try to lease we will just create those + // instead of trying to grab them from the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.CreateBudget = rentLimit - itemsMoved; + + return leasedItem ?? itemFactory(); + } + + /// <summary> + /// Returns an item to the pool. + /// Attempts to add the item to the thread local pool first. + /// If the thread local pool is full, item is added to a shared pool, + /// along with half of the items for the thread local pool, which + /// should prevent acquiring the lock for shared pool too often. + /// If called after the pool is disposed, we make best effort not to + /// add anything to the thread local pool and we guarantee not to add + /// anything to the shared pool (items will be disposed instead). + /// </summary> + public void Return(T item) + { + GrpcPreconditions.CheckNotNull(item); + + var localData = threadLocalData.Value; + if (localData.Queue.Count < threadLocalCapacity && !disposed) + { + localData.Queue.Enqueue(item); + return; + } + if (localData.DisposeBudget > 0) + { + localData.DisposeBudget --; + item.Dispose(); + return; + } + + int itemsReturned = 0; + int returnLimit = rentLimit + 1; + lock (myLock) + { + if (sharedQueue.Count < sharedCapacity && !disposed) + { + sharedQueue.Enqueue(item); + itemsReturned ++; + } + while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) + { + sharedQueue.Enqueue(localData.Queue.Dequeue()); + itemsReturned ++; + } + } + + // If the shared pool could not accomodate all returnLimit items, + // next time we try to return we will just dispose the item + // instead of trying to return them to the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.DisposeBudget = returnLimit - itemsReturned; + + if (itemsReturned == 0) + { + localData.DisposeBudget --; + item.Dispose(); + } + } + + public void Dispose() + { + lock (myLock) + { + if (!disposed) + { + disposed = true; + + while (sharedQueue.Count > 0) + { + sharedQueue.Dequeue().Dispose(); + } + } + } + } + + class ThreadLocalData + { + public ThreadLocalData(int capacity) + { + this.Queue = new Queue<T>(capacity); + } + + public Queue<T> Queue { get; } + public int CreateBudget { get; set; } + public int DisposeBudget { get; set; } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index bd0229a9dd..f1b5a4f9ff 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -219,7 +219,7 @@ namespace Grpc.Core.Internal var list = new List<CompletionQueueSafeHandle>(); for (int i = 0; i < completionQueueCount; i++) { - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); } return list.AsReadOnly(); diff --git a/src/csharp/Grpc.Core/Internal/IObjectPool.cs b/src/csharp/Grpc.Core/Internal/IObjectPool.cs new file mode 100644 index 0000000000..f7d6e30a2a --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IObjectPool.cs @@ -0,0 +1,35 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects. + /// </summary> + internal interface IObjectPool<T> : IDisposable + where T : class + { + T Lease(); + void Return(T item); + } +} diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index d517252cfe..43acb8f915 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -52,6 +52,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled; + public readonly Delegates.grpcsharp_batch_context_reset_delegate grpcsharp_batch_context_reset; public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy; public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create; @@ -169,6 +170,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library); this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library); + this.grpcsharp_batch_context_reset = GetMethodDelegate<Delegates.grpcsharp_batch_context_reset_delegate>(library); this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library); this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library); @@ -311,6 +313,7 @@ namespace Grpc.Core.Internal public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx); + public delegate void grpcsharp_batch_context_reset_delegate(BatchContextSafeHandle ctx); public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx); public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate(); diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index c65b960afb..058dddb7eb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -49,13 +49,14 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; + return result != null; } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; - return result != null; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a308890cde..9b7ea884dd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -64,10 +64,9 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object, // but server shutdown isn't worth optimizing right now. - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null); Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } } diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index b9ceaf8254..2d9e4ba16a 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.8.0-dev</GrpcCsharpVersion> + <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index dab938821f..9b5da1c947 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -33,11 +33,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.8.0.0"; + public const string CurrentAssemblyFileVersion = "1.9.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.8.0-dev"; + public const string CurrentVersion = "1.9.0-dev"; } } diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs index 2d1c33e9a0..eefdb50e39 100644 --- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks public void Run(int threadCount, int iterations, bool useSharedRegistry) { Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry)); - CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null; + CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null; var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry)); threadedBenchmark.Run(); // TODO: parametrize by number of pending completions @@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry) { - var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment); + var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()); var ctx = BatchContextSafeHandle.Create(); var stopwatch = Stopwatch.StartNew(); @@ -64,7 +64,7 @@ namespace Grpc.Microbenchmarks stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); - ctx.Dispose(); + ctx.Recycle(); } private class NopCompletionCallback : IOpCompletionCallback diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index 9cff97eb88..da4f35ff96 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -52,10 +52,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, int payloadSize) { - // TODO(jtattermusch): parametrize by number of pending completions. - // TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle - - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); var call = CreateFakeCall(cq); diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index ff013d5680..8f89e2846a 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -13,7 +13,7 @@ @rem limitations under the License. @rem Current package versions -set VERSION=1.8.0-dev +set VERSION=1.9.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index 44a4791146..6a6cafe2bd 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts -nuget pack Grpc.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index bcb3bfaee5..24d779e1e5 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -197,10 +197,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array* dest, } GPR_EXPORT void GPR_CALLTYPE -grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { - if (!ctx) { - return; - } +grpcsharp_batch_context_reset(grpcsharp_batch_context* ctx) { grpcsharp_metadata_array_destroy_metadata_including_entries( &(ctx->send_initial_metadata)); @@ -216,7 +213,15 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { grpcsharp_metadata_array_destroy_metadata_only( &(ctx->recv_status_on_client.trailing_metadata)); grpc_slice_unref(ctx->recv_status_on_client.status_details); + memset(ctx, 0, sizeof(grpcsharp_batch_context)); +} +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { + if (!ctx) { + return; + } + grpcsharp_batch_context_reset(ctx); gpr_free(ctx); } diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 7841051052..82573edecb 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -5,11 +5,13 @@ "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", "Grpc.Core.Internal.Tests.CompletionQueueEventTest", "Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest", + "Grpc.Core.Internal.Tests.DefaultObjectPoolTest", "Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest", "Grpc.Core.Internal.Tests.TimespecTest", "Grpc.Core.Tests.AppDomainUnloadTest", "Grpc.Core.Tests.AuthContextTest", "Grpc.Core.Tests.AuthPropertyTest", + "Grpc.Core.Tests.CallCancellationTest", "Grpc.Core.Tests.CallCredentialsTest", "Grpc.Core.Tests.CallOptionsTest", "Grpc.Core.Tests.ChannelCredentialsTest", diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json new file mode 100644 index 0000000000..fca3a2a7a6 --- /dev/null +++ b/src/node/health_check/package.json @@ -0,0 +1,29 @@ +{ + "name": "grpc-health-check", + "version": "1.7.2", + "author": "Google Inc.", + "description": "Health check service for use with gRPC", + "repository": { + "type": "git", + "url": "https://github.com/grpc/grpc.git" + }, + "bugs": "https://github.com/grpc/grpc/issues", + "contributors": [ + { + "name": "Michael Lumish", + "email": "mlumish@google.com" + } + ], + "dependencies": { + "grpc": "^1.7.2", + "lodash": "^3.9.3", + "google-protobuf": "^3.0.0" + }, + "files": [ + "LICENSE", + "health.js", + "v1" + ], + "main": "src/node/index.js", + "license": "Apache-2.0" +} diff --git a/src/node/tools/package.json b/src/node/tools/package.json new file mode 100644 index 0000000000..99fd854067 --- /dev/null +++ b/src/node/tools/package.json @@ -0,0 +1,41 @@ +{ + "name": "grpc-tools", + "version": "1.7.2", + "author": "Google Inc.", + "description": "Tools for developing with gRPC on Node.js", + "homepage": "https://grpc.io/", + "repository": { + "type": "git", + "url": "https://github.com/grpc/grpc.git" + }, + "bugs": "https://github.com/grpc/grpc/issues", + "contributors": [ + { + "name": "Michael Lumish", + "email": "mlumish@google.com" + } + ], + "bin": { + "grpc_tools_node_protoc": "./bin/protoc.js", + "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js" + }, + "scripts": { + "install": "./node_modules/.bin/node-pre-gyp install" + }, + "bundledDependencies": ["node-pre-gyp"], + "binary": { + "module_name": "grpc_tools", + "host": "https://storage.googleapis.com/", + "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", + "package_name": "{platform}-{arch}.tar.gz", + "module_path": "bin" + }, + "files": [ + "index.js", + "bin/protoc.js", + "bin/protoc_plugin.js", + "bin/google/protobuf", + "LICENSE" + ], + "main": "index.js" +} diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 9065ab9f73..80e1069ddd 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -42,7 +42,7 @@ Pod::Spec.new do |s| # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed # before them. s.name = '!ProtoCompiler-gRPCPlugin' - v = '1.8.0-dev' + v = '1.9.0-dev' s.version = v s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.' s.description = <<-DESC diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index b5ab611848..ac4596da25 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -299,7 +299,7 @@ static NSString * const kBearerPrefix = @"Bearer "; // network queue if the write didn't succeed. // If the call is a unary call, parameter \a errorHandler will be ignored and // the error handler of GRPCOpSendClose will be executed in case of error. -- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { +- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler { __weak GRPCCall *weakSelf = self; void(^resumingHandler)(void) = ^{ @@ -345,7 +345,7 @@ static NSString * const kBearerPrefix = @"Bearer "; // Only called from the call queue. The error handler will be called from the // network queue if the requests stream couldn't be closed successfully. -- (void)finishRequestWithErrorHandler:(void (^)())errorHandler { +- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler { if (!_unaryCall) { [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] errorHandler:errorHandler]; @@ -441,7 +441,7 @@ static NSString * const kBearerPrefix = @"Bearer "; } _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; __weak typeof(self) weakSelf = self; - void (^handler)() = ^{ + void (^handler)(void) = ^{ typeof(self) strongSelf = weakSelf; [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h index 8d3c45ee50..cb55e46d70 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h @@ -57,6 +57,6 @@ * Only one handler is active at a time, so if this method is called again before the previous * handler has been called, it might never be called at all (or yes, if it has already been queued). */ -- (void)handleLossWithHandler:(nullable void (^)())lossHandler - wifiStatusChangeHandler:(nullable void (^)())wifiStatusChangeHandler; +- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler + wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler; @end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m index b322638500..c8e10dd75f 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m @@ -136,8 +136,8 @@ static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target, return returnValue; } -- (void)handleLossWithHandler:(nullable void (^)())lossHandler - wifiStatusChangeHandler:(nullable void (^)())wifiStatusChangeHandler { +- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler + wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler { __weak typeof(self) weakSelf = self; [self startListeningWithHandler:^(GRPCReachabilityFlags *flags) { typeof(self) strongSelf = weakSelf; diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index f73e9cbc50..a0f4118740 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -93,7 +93,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; if (!connectivityMonitor) { connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:hostURL.host]; - void (^handler)() = ^{ + void (^handler)(void) = ^{ [GRPCHost flushChannelCache]; }; [connectivityMonitor handleLossWithHandler:handler diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index 1cd9da8f3e..f569895e7c 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -30,24 +30,24 @@ @interface GRPCOpSendMetadata : GRPCOperation - (instancetype)initWithMetadata:(NSDictionary *)metadata - handler:(void(^)())handler; + handler:(void(^)(void))handler; - (instancetype)initWithMetadata:(NSDictionary *)metadata flags:(uint32_t)flags - handler:(void(^)())handler NS_DESIGNATED_INITIALIZER; + handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER; @end @interface GRPCOpSendMessage : GRPCOperation - (instancetype)initWithMessage:(NSData *)message - handler:(void(^)())handler NS_DESIGNATED_INITIALIZER; + handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER; @end @interface GRPCOpSendClose : GRPCOperation -- (instancetype)initWithHandler:(void(^)())handler NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithHandler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER; @end @@ -79,7 +79,7 @@ path:(NSString *)path timeout:(NSTimeInterval)timeout NS_DESIGNATED_INITIALIZER; -- (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)())errorHandler; +- (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)(void))errorHandler; - (void)startBatchWithOperations:(NSArray *)ops; diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index b0b1223b64..d26d13475d 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -36,12 +36,12 @@ // Most operation subclasses don't set any flags in the grpc_op, and rely on the flag member being // initialized to zero. grpc_op _op; - void(^_handler)(); + void(^_handler)(void); } - (void)finish { if (_handler) { - void(^handler)() = _handler; + void(^handler)(void) = _handler; _handler = nil; handler(); } @@ -55,13 +55,13 @@ } - (instancetype)initWithMetadata:(NSDictionary *)metadata - handler:(void (^)())handler { + handler:(void (^)(void))handler { return [self initWithMetadata:metadata flags:0 handler:handler]; } - (instancetype)initWithMetadata:(NSDictionary *)metadata flags:(uint32_t)flags - handler:(void (^)())handler { + handler:(void (^)(void))handler { if (self = [super init]) { _op.op = GRPC_OP_SEND_INITIAL_METADATA; _op.data.send_initial_metadata.count = metadata.count; @@ -92,7 +92,7 @@ return [self initWithMessage:nil handler:nil]; } -- (instancetype)initWithMessage:(NSData *)message handler:(void (^)())handler { +- (instancetype)initWithMessage:(NSData *)message handler:(void (^)(void))handler { if (!message) { [NSException raise:NSInvalidArgumentException format:@"message cannot be nil"]; } @@ -116,7 +116,7 @@ return [self initWithHandler:nil]; } -- (instancetype)initWithHandler:(void (^)())handler { +- (instancetype)initWithHandler:(void (^)(void))handler { if (self = [super init]) { _op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; _handler = handler; @@ -271,7 +271,7 @@ [self startBatchWithOperations:operations errorHandler:nil]; } -- (void)startBatchWithOperations:(NSArray *)operations errorHandler:(void (^)())errorHandler { +- (void)startBatchWithOperations:(NSArray *)operations errorHandler:(void (^)(void))errorHandler { // Keep logs of op batches when we are running tests. Disabled when in production for improved // performance. #ifdef GRPC_TEST_OBJC diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h index db589d12de..69dd6266fd 100644 --- a/src/objective-c/GRPCClient/private/version.h +++ b/src/objective-c/GRPCClient/private/version.h @@ -23,4 +23,4 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.8.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev" diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index cec45fae71..f16a3d052a 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -46,7 +46,7 @@ * Enqueues writeValue: to be sent to the writeable in the main thread. * The passed handler is invoked from the main thread after writeValue: returns. */ -- (void)enqueueValue:(id)value completionHandler:(void (^)())handler; +- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler; /** * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index bbfe491783..37bc975f87 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -50,7 +50,7 @@ dispatchQueue:dispatch_get_main_queue()]; } -- (void)enqueueValue:(id)value completionHandler:(void (^)())handler { +- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ // We're racing a possible cancellation performed by another thread. To turn all already- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h index bdcf5d5937..f88e46b169 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h @@ -46,7 +46,7 @@ * Returns a writer that pushes to its writeable the successive values returned by the passed * block. When the block first returns nil, it is released. */ -+ (GRXWriter *)writerWithValueSupplier:(id (^)())block; ++ (GRXWriter *)writerWithValueSupplier:(id (^)(void))block; /** * Returns a writer that iterates over the values of the passed container and pushes them to diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.m b/src/objective-c/RxLibrary/GRXImmediateWriter.m index d8c6975801..c5d6d1310a 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.m @@ -52,7 +52,7 @@ return [self writerWithEnumerator:enumerator error:nil]; } -+ (GRXWriter *)writerWithValueSupplier:(id (^)())block { ++ (GRXWriter *)writerWithValueSupplier:(id (^)(void))block { return [self writerWithEnumerator:[NSEnumerator grx_enumeratorWithValueSupplier:block]]; } diff --git a/src/objective-c/RxLibrary/GRXWriter+Immediate.h b/src/objective-c/RxLibrary/GRXWriter+Immediate.h index 292a35f61f..d7935deaa2 100644 --- a/src/objective-c/RxLibrary/GRXWriter+Immediate.h +++ b/src/objective-c/RxLibrary/GRXWriter+Immediate.h @@ -30,7 +30,7 @@ * Returns a writer that pushes to its writeable the successive values returned by the passed * block. When the block first returns nil, it is released. */ -+ (instancetype)writerWithValueSupplier:(id (^)())block; ++ (instancetype)writerWithValueSupplier:(id (^)(void))block; /** * Returns a writer that iterates over the values of the passed container and pushes them to diff --git a/src/objective-c/RxLibrary/GRXWriter+Immediate.m b/src/objective-c/RxLibrary/GRXWriter+Immediate.m index 43aa9c5437..a36a56764d 100644 --- a/src/objective-c/RxLibrary/GRXWriter+Immediate.m +++ b/src/objective-c/RxLibrary/GRXWriter+Immediate.m @@ -27,7 +27,7 @@ return [GRXImmediateWriter writerWithEnumerator:enumerator]; } -+ (instancetype)writerWithValueSupplier:(id (^)())block { ++ (instancetype)writerWithValueSupplier:(id (^)(void))block { return [GRXImmediateWriter writerWithValueSupplier:block]; } diff --git a/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.h b/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.h index 8c72f7858d..38dbaaf9a4 100644 --- a/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.h +++ b/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.h @@ -38,5 +38,5 @@ * Returns a NSEnumerator instance that delegates the invocations of nextObject to the passed block. * When the block first returns nil, it is released. */ -+ (NSEnumerator *)grx_enumeratorWithValueSupplier:(id (^)())block; ++ (NSEnumerator *)grx_enumeratorWithValueSupplier:(id (^)(void))block; @end diff --git a/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.m b/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.m index 309e25ede5..7d8191d0f7 100644 --- a/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.m +++ b/src/objective-c/RxLibrary/NSEnumerator+GRXUtil.m @@ -33,7 +33,7 @@ return [[GRXNSScalarEnumerator alloc] initWithValue:value]; } -+ (NSEnumerator *)grx_enumeratorWithValueSupplier:(id (^)())block { ++ (NSEnumerator *)grx_enumeratorWithValueSupplier:(id (^)(void))block { return [[GRXNSBlockEnumerator alloc] initWithValueSupplier:block]; } @end diff --git a/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.h b/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.h index c45338acdd..c3317b2d04 100644 --- a/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.h +++ b/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.h @@ -27,5 +27,5 @@ * The first time the passed block returns nil, the enumeration will end and the block will be * released. */ -- (instancetype)initWithValueSupplier:(id (^)())block; +- (instancetype)initWithValueSupplier:(id (^)(void))block; @end diff --git a/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.m b/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.m index 7e7cc572b8..eddfd26680 100644 --- a/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.m +++ b/src/objective-c/RxLibrary/private/GRXNSBlockEnumerator.m @@ -19,14 +19,14 @@ #import "GRXNSBlockEnumerator.h" @implementation GRXNSBlockEnumerator { - id (^_block)(); + id (^_block)(void); } - (instancetype)init { return [self initWithValueSupplier:nil]; } -- (instancetype)initWithValueSupplier:(id (^)())block { +- (instancetype)initWithValueSupplier:(id (^)(void))block { if ((self = [super init])) { _block = block; } diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h index 02515063fa..6e3a073020 100644 --- a/src/objective-c/tests/version.h +++ b/src/objective-c/tests/version.h @@ -23,5 +23,5 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.8.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.9.0-dev" #define GRPC_C_VERSION_STRING @"5.0.0-dev" diff --git a/src/php/composer.json b/src/php/composer.json index 09471d23fe..43833980f9 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -2,7 +2,7 @@ "name": "grpc/grpc-dev", "description": "gRPC library for PHP - for Developement use only", "license": "Apache-2.0", - "version": "1.8.0", + "version": "1.9.0", "require": { "php": ">=5.5.0", "google/protobuf": "^v3.3.0" diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h index 93dd563cff..48131d72d1 100644 --- a/src/php/ext/grpc/version.h +++ b/src/php/ext/grpc/version.h @@ -20,6 +20,6 @@ #ifndef VERSION_H #define VERSION_H -#define PHP_GRPC_VERSION "1.8.0dev" +#define PHP_GRPC_VERSION "1.9.0dev" #endif /* VERSION_H */ diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 0887ac1722..993c49d4af 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.8.0.dev0""" +__version__ = """1.9.0.dev0""" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 330c4185c6..56d6ebd842 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -29,6 +29,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/env_linux.cc', 'src/core/lib/support/env_posix.cc', 'src/core/lib/support/env_windows.cc', + 'src/core/lib/support/fork.cc', 'src/core/lib/support/histogram.cc', 'src/core/lib/support/host_port.cc', 'src/core/lib/support/log.cc', @@ -38,7 +39,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/log_windows.cc', 'src/core/lib/support/mpscq.cc', 'src/core/lib/support/murmur_hash.cc', - 'src/core/lib/support/stack_lockfree.cc', 'src/core/lib/support/string.cc', 'src/core/lib/support/string_posix.cc', 'src/core/lib/support/string_util_windows.cc', @@ -94,6 +94,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_windows.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', + 'src/core/lib/iomgr/fork_posix.cc', + 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 61c4157375..8f07f3b30b 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='1.8.0.dev0' +VERSION='1.9.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 889297f020..0987d57261 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION='1.8.0.dev0' +VERSION='1.9.0.dev0' diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 7a2e6f5027..c105f57509 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -56,7 +56,7 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.3.0', +INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION),) try: diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 192f4cc217..95d2ff143a 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION='1.8.0.dev0' +VERSION='1.9.0.dev0' diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 25312c7c0a..760b89373a 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.3.0', +INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION),) try: diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 83470c2825..afc6dd83f2 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION='1.8.0.dev0' +VERSION='1.9.0.dev0' diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py index 0cc336abd1..fa40424f6a 100644 --- a/src/python/grpcio_testing/setup.py +++ b/src/python/grpcio_testing/setup.py @@ -28,7 +28,7 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.3.0', +INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION),) setuptools.setup( diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 7065edd3bf..99ca3fd82d 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION='1.8.0.dev0' +VERSION='1.9.0.dev0' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index debe14c40e..aeb4ea9c53 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -41,8 +41,8 @@ INSTALL_REQUIRES = ( 'grpcio>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.3.0', 'six>=1.10', 'google-auth>=1.0.0', - 'requests>=2.14.2') + 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10', + 'google-auth>=1.0.0', 'requests>=2.14.2') COMMAND_CLASS = { # Run `preprocess` *before* doing any packaging! diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 0ceb3667eb..57544b0398 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -23,13 +23,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' control_stub, client_pid = start_client('channel_closing_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 begin diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index 98339baebe..f4b1cd2bb8 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -22,14 +22,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('channel_state_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 - Process.kill('SIGTERM', client_pid) begin diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index a1b824fcbf..790fc23e92 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -40,12 +40,13 @@ end # ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner - def initialize(service_impl) + def initialize(service_impl, rpc_server_args: {}) @service_impl = service_impl + @rpc_server_args = rpc_server_args end def run - @srv = GRPC::RpcServer.new + @srv = GRPC::RpcServer.new(@rpc_server_args) port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) @srv.handle(@service_impl) @@ -75,7 +76,6 @@ def start_client(client_main, server_port) client_path, "--client_control_port=#{client_control_port}", "--server_port=#{server_port}") - sleep 1 control_stub = ClientControl::ClientController::Stub.new( "localhost:#{client_control_port}", :this_channel_is_insecure) [control_stub, client_pid] diff --git a/src/ruby/end2end/forking_client_driver.rb b/src/ruby/end2end/forking_client_driver.rb index 63565395f7..5cf1d73112 100755 --- a/src/ruby/end2end/forking_client_driver.rb +++ b/src/ruby/end2end/forking_client_driver.rb @@ -20,12 +20,6 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - # TODO(apolcyn) Can we get rid of this sleep? - # Without it, an immediate call to the just started EchoServer - # fails with UNAVAILABLE - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('forking_client_client.rb', server_port) diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb index c35719a71f..ff40350cfa 100755 --- a/src/ruby/end2end/grpc_class_init_client.rb +++ b/src/ruby/end2end/grpc_class_init_client.rb @@ -54,7 +54,7 @@ def run_concurrency_stress_test(test_proc) test_proc.call - fail 'exception thrown while child thread initing class' + fail '(expected) exception thrown while child thread initing class' end # default (no gc_stress and no concurrency_stress) diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb index fce5d13e82..370f7e686b 100755 --- a/src/ruby/end2end/killed_client_thread_driver.rb +++ b/src/ruby/end2end/killed_client_thread_driver.rb @@ -17,56 +17,46 @@ require_relative './end2end_common' # Service that sleeps for a long time upon receiving an 'echo request' -# Also, this notifies @call_started_cv once it has received a request. +# Also, this calls it's callback upon receiving an RPC as a method +# of synchronization/waiting for the child to start. class SleepingEchoServerImpl < Echo::EchoServer::Service - def initialize(call_started, call_started_mu, call_started_cv) - @call_started = call_started - @call_started_mu = call_started_mu - @call_started_cv = call_started_cv + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback end def echo(echo_req, _) - @call_started_mu.synchronize do - @call_started.set_true - @call_started_cv.signal - end - sleep 1000 + @received_rpc_callback.call + # sleep forever to get the client stuck waiting + sleep Echo::EchoReply.new(response: echo_req.request) end end -# Mutable boolean -class BoolHolder - attr_reader :val - - def init - @val = false - end - - def set_true - @val = true - end -end - def main STDERR.puts 'start server' - call_started = BoolHolder.new - call_started_mu = Mutex.new - call_started_cv = ConditionVariable.new + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end - service_impl = SleepingEchoServerImpl.new(call_started, - call_started_mu, - call_started_cv) - server_runner = ServerRunner.new(service_impl) + service_impl = SleepingEchoServerImpl.new(received_rpc_callback) + # RPCs against the server will all be hanging, so kill thread + # pool workers immediately rather than after waiting for a second. + rpc_server_args = { poll_period: 0, pool_keep_alive: 0 } + server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args) server_port = server_runner.run - STDERR.puts 'start client' _, client_pid = start_client('killed_client_thread_client.rb', server_port) - call_started_mu.synchronize do - call_started_cv.wait(call_started_mu) until call_started.val + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started end # SIGTERM the child process now that it's diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 94d5e9da2d..59f6f275e4 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -26,6 +26,8 @@ def watch_state(ch) fail "non-idle state: #{state}" unless state == IDLE ch.watch_connectivity_state(IDLE, Time.now + 360) end + # sleep to get the thread into the middle of a + # "watch connectivity state" call sleep 0.1 thd.kill end diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb index 41b5f334be..129ad7cb7f 100755 --- a/src/ruby/end2end/sig_handling_client.rb +++ b/src/ruby/end2end/sig_handling_client.rb @@ -30,16 +30,18 @@ class SigHandlingClientController < ClientControl::ClientController::Service end def shutdown(_, _) - Thread.new do - # TODO(apolcyn) There is a race between stopping the - # server and the "shutdown" rpc completing, - # See if stop method on server can end active RPC cleanly, to - # avoid this sleep. - sleep 3 + # Spawn a new thread because RpcServer#stop is + # synchronous and blocks until either this RPC has finished, + # or the server's "poll_period" seconds have passed. + @shutdown_thread = Thread.new do @srv.stop end ClientControl::Void.new end + + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -62,13 +64,23 @@ def main STDERR.puts 'SIGINT received' end - srv = GRPC::RpcServer.new + # The "shutdown" RPC should end very quickly. + # Allow a few seconds to be safe. + srv = GRPC::RpcServer.new(poll_period: 3) srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) - srv.handle(SigHandlingClientController.new(srv, stub)) - srv.run + control_service = SigHandlingClientController.new(srv, stub) + srv.handle(control_service) + server_thread = Thread.new do + srv.run + end + srv.wait_till_running + # send a first RPC to notify the parent process that we've started + stub.echo(Echo::EchoRequest.new(request: 'client/child started')) + server_thread.join + control_service.join_shutdown_thread end main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index 291bf29424..0ad1cbd661 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -19,17 +19,42 @@ require_relative './end2end_common' +# A service that calls back it's received_rpc_callback +# upon receiving an RPC. Used for synchronization/waiting +# for child process to start. +class ClientStartedService < Echo::EchoServer::Service + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback + end + + def echo(echo_req, _) + @received_rpc_callback.call unless @received_rpc_callback.nil? + @received_rpc_callback = nil + Echo::EchoReply.new(response: echo_req.request) + end +end + def main STDERR.puts 'start server' - server_runner = ServerRunner.new(EchoServerImpl) - server_port = server_runner.run - - sleep 1 + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end + client_started_service = ClientStartedService.new(received_rpc_callback) + server_runner = ServerRunner.new(client_started_service) + server_port = server_runner.run STDERR.puts 'start client' control_stub, client_pid = start_client('sig_handling_client.rb', server_port) - sleep 1 + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started + end count = 0 while count < 5 diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index b054f0f5f3..2df22f48a2 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -23,13 +23,9 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('sig_int_during_channel_watch_client.rb', server_port) - # give time for the client to get into the middle # of a channel state watch call sleep 1 diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 3a4c30d385..50713f900e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -337,7 +337,7 @@ extern grpc_composite_call_credentials_create_type grpc_composite_call_credentia typedef grpc_call_credentials*(*grpc_google_compute_engine_credentials_create_type)(void* reserved); extern grpc_google_compute_engine_credentials_create_type grpc_google_compute_engine_credentials_create_import; #define grpc_google_compute_engine_credentials_create grpc_google_compute_engine_credentials_create_import -typedef gpr_timespec(*grpc_max_auth_token_lifetime_type)(); +typedef gpr_timespec(*grpc_max_auth_token_lifetime_type)(void); extern grpc_max_auth_token_lifetime_type grpc_max_auth_token_lifetime_import; #define grpc_max_auth_token_lifetime grpc_max_auth_token_lifetime_import typedef grpc_call_credentials*(*grpc_service_account_jwt_access_credentials_create_type)(const char* json_key, gpr_timespec token_lifetime, void* reserved); @@ -589,7 +589,7 @@ extern gpr_free_aligned_type gpr_free_aligned_import; typedef void(*gpr_set_allocation_functions_type)(gpr_allocation_functions functions); extern gpr_set_allocation_functions_type gpr_set_allocation_functions_import; #define gpr_set_allocation_functions gpr_set_allocation_functions_import -typedef gpr_allocation_functions(*gpr_get_allocation_functions_type)(); +typedef gpr_allocation_functions(*gpr_get_allocation_functions_type)(void); extern gpr_get_allocation_functions_type gpr_get_allocation_functions_import; #define gpr_get_allocation_functions gpr_get_allocation_functions_import typedef gpr_avl(*gpr_avl_create_type)(const gpr_avl_vtable* vtable); @@ -712,7 +712,7 @@ extern gpr_log_message_type gpr_log_message_import; typedef void(*gpr_set_log_verbosity_type)(gpr_log_severity min_severity_to_print); extern gpr_set_log_verbosity_type gpr_set_log_verbosity_import; #define gpr_set_log_verbosity gpr_set_log_verbosity_import -typedef void(*gpr_log_verbosity_init_type)(); +typedef void(*gpr_log_verbosity_init_type)(void); extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import; #define gpr_log_verbosity_init gpr_log_verbosity_init_import typedef void(*gpr_set_log_function_type)(gpr_log_func func); diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index d5fc11dc1c..c80c7fcd32 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -92,9 +92,13 @@ module GRPC # Stops the jobs in the pool def stop GRPC.logger.info('stopping, will wait for all the workers to exit') - schedule { throw :exit } while ready_for_work? - @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true + loop do + break unless ready_for_work? + worker_queue = @ready_workers.pop + worker_queue << [proc { throw :exit }, []] + end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers @@ -138,7 +142,10 @@ module GRPC end # there shouldn't be any work given to this thread while its busy fail('received a task while busy') unless worker_queue.empty? - @ready_workers << worker_queue + @stop_mutex.synchronize do + return if @stopped + @ready_workers << worker_queue + end end end end @@ -186,8 +193,13 @@ module GRPC # * max_waiting_requests: Deprecated due to internal changes to the thread # pool. This is still an argument for compatibility but is ignored. # - # * poll_period: when present, the server polls for new events with this - # period + # * poll_period: The amount of time in seconds to wait for + # currently-serviced RPC's to finish before cancelling them when shutting + # down the server. + # + # * pool_keep_alive: The amount of time in seconds to wait + # for currently busy thread-pool threads to finish before + # forcing an abrupt exit to each thread. # # * connect_md_proc: # when non-nil is a proc for determining metadata to to send back the client @@ -202,17 +214,18 @@ module GRPC # intercepting server handlers to provide extra functionality. # Interceptors are an EXPERIMENTAL API. # - def initialize(pool_size:DEFAULT_POOL_SIZE, - max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:DEFAULT_POLL_PERIOD, - connect_md_proc:nil, - server_args:{}, - interceptors:[]) + def initialize(pool_size: DEFAULT_POOL_SIZE, + max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, + poll_period: DEFAULT_POLL_PERIOD, + pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, + connect_md_proc: nil, + server_args: {}, + interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size - @pool = Pool.new(@pool_size) + @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 3001579ce7..be1412511a 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -14,5 +14,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.8.0.dev' + VERSION = '1.9.0.dev' end diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb index f23db39da5..c492455d8f 100644 --- a/src/ruby/pb/grpc/health/checker.rb +++ b/src/ruby/pb/grpc/health/checker.rb @@ -48,6 +48,20 @@ module Grpc @status_mutex.synchronize { @statuses["#{service}"] = status } end + # Adds given health status for all given services + def set_status_for_services(status, *services) + @status_mutex.synchronize do + services.each { |service| @statuses["#{service}"] = status } + end + end + + # Adds health status for each service given within hash + def add_statuses(service_statuses = {}) + @status_mutex.synchronize do + service_statuses.each_pair { |service, status| @statuses["#{service}"] = status } + end + end + # Clears the status for the given service. def clear_status(service) @status_mutex.synchronize { @statuses.delete("#{service}") } diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 21e8815890..8258487418 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -77,8 +77,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service Grpc::Testing::CoreResponse.new(cores: cpu_cores) end def quit_worker(_args, _call) - Thread.new { - sleep 3 + @shutdown_thread = Thread.new { @server.stop } Grpc::Testing::Void.new @@ -87,6 +86,9 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service @server = s @server_port = sp end + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -107,11 +109,13 @@ def main # Configure any errors with client or server child threads to surface Thread.abort_on_exception = true - s = GRPC::RpcServer.new + s = GRPC::RpcServer.new(poll_period: 3) s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) - s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i)) + worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i) + s.handle(worker_service) s.run + worker_service.join_shutdown_thread end main diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb index 6c9e206c3f..c79ccfd2e0 100644 --- a/src/ruby/spec/pb/health/checker_spec.rb +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -99,6 +99,35 @@ describe Grpc::Health::Checker do end end + context 'method `add_statuses`' do + it 'should add status to each service' do + checker = Grpc::Health::Checker.new + checker.add_statuses( + 'service1' => ServingStatus::SERVING, + 'service2' => ServingStatus::NOT_SERVING + ) + service1_health = checker.check(HCReq.new(service: 'service1'), nil) + service2_health = checker.check(HCReq.new(service: 'service2'), nil) + expect(service1_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + expect(service2_health).to eq(HCResp.new(status: ServingStatus::NOT_SERVING)) + end + end + + context 'method `set_status_for_services`' do + it 'should add given status to all given services' do + checker = Grpc::Health::Checker.new + checker.set_status_for_services( + ServingStatus::SERVING, + 'service1', + 'service2' + ) + service1_health = checker.check(HCReq.new(service: 'service1'), nil) + service2_health = checker.check(HCReq.new(service: 'service2'), nil) + expect(service1_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + expect(service2_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + end + end + context 'method `check`' do success_tests.each do |t| it "should fail with NOT_FOUND when #{t[:desc]}" do diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index c584a7cf59..48aad39e08 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -14,6 +14,6 @@ module GRPC module Tools - VERSION = '1.8.0.dev' + VERSION = '1.9.0.dev' end end |