aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--bazel/grpc_deps.bzl17
-rw-r--r--examples/BUILD19
-rw-r--r--examples/cpp/keyvaluestore/client.cc86
-rw-r--r--examples/cpp/keyvaluestore/server.cc97
-rw-r--r--examples/protos/keyvaluestore.proto33
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc12
-rw-r--r--src/core/lib/iomgr/combiner.cc9
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc3
-rw-r--r--src/core/lib/iomgr/ev_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/iomgr.cc4
-rw-r--r--src/core/lib/iomgr/iomgr.h3
-rw-r--r--src/core/lib/iomgr/iomgr_custom.cc6
-rw-r--r--src/core/lib/iomgr/iomgr_internal.cc4
-rw-r--r--src/core/lib/iomgr/iomgr_internal.h4
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc7
-rw-r--r--src/core/lib/iomgr/iomgr_posix_cfstream.cc7
-rw-r--r--src/core/lib/iomgr/iomgr_windows.cc7
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m371
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.m6
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h3
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m102
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h6
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m55
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.h2
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.m30
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h4
-rw-r--r--templates/test/cpp/naming/resolver_component_tests_defs.include1
-rw-r--r--test/core/client_channel/resolvers/dns_resolver_test.cc11
-rw-r--r--test/core/surface/concurrent_connectivity_test.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc1
-rwxr-xr-xtest/cpp/naming/resolver_component_tests_runner.py1
-rw-r--r--third_party/toolchains/BUILD16
-rw-r--r--tools/internal_ci/helper_scripts/prepare_build_macos_rc2
-rwxr-xr-xtools/internal_ci/linux/grpc_bazel_on_foundry_base.sh2
-rw-r--r--tools/interop_matrix/README.md17
-rw-r--r--tools/interop_matrix/client_matrix.py57
-rwxr-xr-xtools/interop_matrix/create_matrix_images.py2
-rw-r--r--tools/remote_build/rbe_common.bazelrc8
44 files changed, 699 insertions, 376 deletions
diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl
index ba0d72a266..30c5d2a484 100644
--- a/bazel/grpc_deps.bzl
+++ b/bazel/grpc_deps.bzl
@@ -93,17 +93,17 @@ def grpc_deps():
native.bind(
name = "opencensus-trace",
- actual = "@io_opencensus_cpp//opencensus/trace:trace"
+ actual = "@io_opencensus_cpp//opencensus/trace:trace",
)
native.bind(
name = "opencensus-stats",
- actual = "@io_opencensus_cpp//opencensus/stats:stats"
+ actual = "@io_opencensus_cpp//opencensus/stats:stats",
)
native.bind(
name = "opencensus-stats-test",
- actual = "@io_opencensus_cpp//opencensus/stats:test_utils"
+ actual = "@io_opencensus_cpp//opencensus/stats:test_utils",
)
if "boringssl" not in native.existing_rules():
@@ -177,16 +177,16 @@ def grpc_deps():
if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules():
http_archive(
name = "com_github_bazelbuild_bazeltoolchains",
- strip_prefix = "bazel-toolchains-280edaa6f93623074513d2b426068de42e62ea4d",
+ strip_prefix = "bazel-toolchains-37419a124bdb9af2fec5b99a973d359b6b899b61",
urls = [
- "https://mirror.bazel.build/github.com/bazelbuild/bazel-toolchains/archive/280edaa6f93623074513d2b426068de42e62ea4d.tar.gz",
- "https://github.com/bazelbuild/bazel-toolchains/archive/280edaa6f93623074513d2b426068de42e62ea4d.tar.gz",
+ "https://mirror.bazel.build/github.com/bazelbuild/bazel-toolchains/archive/37419a124bdb9af2fec5b99a973d359b6b899b61.tar.gz",
+ "https://github.com/bazelbuild/bazel-toolchains/archive/37419a124bdb9af2fec5b99a973d359b6b899b61.tar.gz",
],
- sha256 = "50c9df51f80cdf9ff8f2bc27620c155526b9ba67be95e8a686f32ff8898a06e2",
+ sha256 = "ee854b5de299138c1f4a2edb5573d22b21d975acfc7aa938f36d30b49ef97498",
)
if "io_opencensus_cpp" not in native.existing_rules():
- http_archive(
+ http_archive(
name = "io_opencensus_cpp",
strip_prefix = "opencensus-cpp-fdf0f308b1631bb4a942e32ba5d22536a6170274",
url = "https://github.com/census-instrumentation/opencensus-cpp/archive/fdf0f308b1631bb4a942e32ba5d22536a6170274.tar.gz",
@@ -199,7 +199,6 @@ def grpc_deps():
url = "https://github.com/google/upb/archive/9ce4a77f61c134bbed28bfd5be5cd7dc0e80f5e3.tar.gz",
)
-
# TODO: move some dependencies from "grpc_deps" here?
def grpc_test_only_deps():
"""Internal, not intended for use by packages that are consuming grpc.
diff --git a/examples/BUILD b/examples/BUILD
index c4f25d0de9..4fee663bd9 100644
--- a/examples/BUILD
+++ b/examples/BUILD
@@ -38,6 +38,11 @@ grpc_proto_library(
srcs = ["protos/route_guide.proto"],
)
+grpc_proto_library(
+ name = "keyvaluestore",
+ srcs = ["protos/keyvaluestore.proto"],
+)
+
cc_binary(
name = "greeter_client",
srcs = ["cpp/helloworld/greeter_client.cc"],
@@ -93,3 +98,17 @@ cc_binary(
defines = ["BAZEL_BUILD"],
deps = [":helloworld", "//:grpc++"],
)
+
+cc_binary(
+ name = "keyvaluestore_client",
+ srcs = ["cpp/keyvaluestore/client.cc"],
+ defines = ["BAZEL_BUILD"],
+ deps = [":keyvaluestore", "//:grpc++"],
+)
+
+cc_binary(
+ name = "keyvaluestore_server",
+ srcs = ["cpp/keyvaluestore/server.cc"],
+ defines = ["BAZEL_BUILD"],
+ deps = [":keyvaluestore", "//:grpc++"],
+) \ No newline at end of file
diff --git a/examples/cpp/keyvaluestore/client.cc b/examples/cpp/keyvaluestore/client.cc
new file mode 100644
index 0000000000..17e407c273
--- /dev/null
+++ b/examples/cpp/keyvaluestore/client.cc
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <grpcpp/grpcpp.h>
+
+#ifdef BAZEL_BUILD
+#include "examples/protos/keyvaluestore.grpc.pb.h"
+#else
+#include "keyvaluestore.grpc.pb.h"
+#endif
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+using keyvaluestore::KeyValueStore;
+using keyvaluestore::Request;
+using keyvaluestore::Response;
+
+class KeyValueStoreClient {
+ public:
+ KeyValueStoreClient(std::shared_ptr<Channel> channel)
+ : stub_(KeyValueStore::NewStub(channel)) {}
+
+ // Requests each key in the vector and displays the key and its corresponding
+ // value as a pair
+ void GetValues(const std::vector<std::string>& keys) {
+ // Context for the client. It could be used to convey extra information to
+ // the server and/or tweak certain RPC behaviors.
+ ClientContext context;
+ auto stream = stub_->GetValues(&context);
+ for (const auto& key : keys) {
+ // Key we are sending to the server.
+ Request request;
+ request.set_key(key);
+ stream->Write(request);
+
+ // Get the value for the sent key
+ Response response;
+ stream->Read(&response);
+ std::cout << key << " : " << response.value() << "\n";
+ }
+ stream->WritesDone();
+ Status status = stream->Finish();
+ if (!status.ok()) {
+ std::cout << status.error_code() << ": " << status.error_message()
+ << std::endl;
+ std::cout << "RPC failed";
+ }
+ }
+
+ private:
+ std::unique_ptr<KeyValueStore::Stub> stub_;
+};
+
+int main(int argc, char** argv) {
+ // Instantiate the client. It requires a channel, out of which the actual RPCs
+ // are created. This channel models a connection to an endpoint (in this case,
+ // localhost at port 50051). We indicate that the channel isn't authenticated
+ // (use of InsecureChannelCredentials()).
+ KeyValueStoreClient client(grpc::CreateChannel(
+ "localhost:50051", grpc::InsecureChannelCredentials()));
+ std::vector<std::string> keys = {"key1", "key2", "key3", "key4", "key5"};
+ client.GetValues(keys);
+
+ return 0;
+}
diff --git a/examples/cpp/keyvaluestore/server.cc b/examples/cpp/keyvaluestore/server.cc
new file mode 100644
index 0000000000..e75da9c62d
--- /dev/null
+++ b/examples/cpp/keyvaluestore/server.cc
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <grpcpp/grpcpp.h>
+
+#ifdef BAZEL_BUILD
+#include "examples/protos/keyvaluestore.grpc.pb.h"
+#else
+#include "keyvaluestore.grpc.pb.h"
+#endif
+
+using grpc::Server;
+using grpc::ServerBuilder;
+using grpc::ServerContext;
+using grpc::ServerReaderWriter;
+using grpc::Status;
+using keyvaluestore::KeyValueStore;
+using keyvaluestore::Request;
+using keyvaluestore::Response;
+
+struct kv_pair {
+ const char* key;
+ const char* value;
+};
+
+static const kv_pair kvs_map[] = {
+ {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"},
+ {"key4", "value4"}, {"key5", "value5"},
+};
+
+const char* get_value_from_map(const char* key) {
+ for (size_t i = 0; i < sizeof(kvs_map) / sizeof(kv_pair); ++i) {
+ if (strcmp(key, kvs_map[i].key) == 0) {
+ return kvs_map[i].value;
+ }
+ }
+ return "";
+}
+
+// Logic and data behind the server's behavior.
+class KeyValueStoreServiceImpl final : public KeyValueStore::Service {
+ Status GetValues(ServerContext* context,
+ ServerReaderWriter<Response, Request>* stream) override {
+ Request request;
+ while (stream->Read(&request)) {
+ Response response;
+ response.set_value(get_value_from_map(request.key().c_str()));
+ stream->Write(response);
+ }
+ return Status::OK;
+ }
+};
+
+void RunServer() {
+ std::string server_address("0.0.0.0:50051");
+ KeyValueStoreServiceImpl service;
+
+ ServerBuilder builder;
+ // Listen on the given address without any authentication mechanism.
+ builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+ // Register "service" as the instance through which we'll communicate with
+ // clients. In this case, it corresponds to an *synchronous* service.
+ builder.RegisterService(&service);
+ // Finally assemble the server.
+ std::unique_ptr<Server> server(builder.BuildAndStart());
+ std::cout << "Server listening on " << server_address << std::endl;
+
+ // Wait for the server to shutdown. Note that some other thread must be
+ // responsible for shutting down the server for this call to ever return.
+ server->Wait();
+}
+
+int main(int argc, char** argv) {
+ RunServer();
+
+ return 0;
+}
diff --git a/examples/protos/keyvaluestore.proto b/examples/protos/keyvaluestore.proto
new file mode 100644
index 0000000000..74ad57e029
--- /dev/null
+++ b/examples/protos/keyvaluestore.proto
@@ -0,0 +1,33 @@
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package keyvaluestore;
+
+// A simple key-value storage service
+service KeyValueStore {
+ // Provides a value for each key request
+ rpc GetValues (stream Request) returns (stream Response) {}
+}
+
+// The request message containing the key
+message Request {
+ string key = 1;
+}
+
+// The response message containing the value associated with the key
+message Response {
+ string value = 1;
+}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index abacd0c960..fba20000ef 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -472,7 +472,7 @@ static grpc_address_resolver_vtable ares_resolver = {
grpc_resolve_address_ares, blocking_resolve_address_ares};
static bool should_use_ares(const char* resolver_env) {
- return resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0;
+ return resolver_env == nullptr || gpr_stricmp(resolver_env, "ares") == 0;
}
void grpc_resolver_dns_ares_init() {
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 349d8681d5..ade88da4cb 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -335,6 +335,9 @@ static void add_to_storage(struct stream_obj* s,
/* add new op at the beginning of the linked list. The memory is freed
in remove_from_storage */
op_and_state* new_op = grpc_core::New<op_and_state>(s, *op);
+ // Pontential fix to crash on GPR_ASSERT(!curr->done)
+ // TODO (mxyan): check if this is indeed necessary.
+ new_op->done = false;
gpr_mu_lock(&s->mu);
storage->head = new_op;
storage->num_pending_ops++;
@@ -391,7 +394,7 @@ static void execute_from_storage(stream_obj* s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
- GPR_ASSERT(curr->done == 0);
+ GPR_ASSERT(!curr->done);
enum e_op_result result = execute_stream_op(curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
@@ -400,13 +403,12 @@ static void execute_from_storage(stream_obj* s) {
struct op_and_state* next = curr->next;
remove_from_storage(s, curr);
curr = next;
- }
- /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
- if (result == NO_ACTION_POSSIBLE) {
+ } else if (result == NO_ACTION_POSSIBLE) {
curr = curr->next;
} else if (result == ACTION_TAKEN_WITH_CALLBACK) {
+ /* wait for the callback */
break;
- }
+ } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
}
gpr_mu_unlock(&s->mu);
}
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 7c0062eb4e..402f8904ea 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -29,6 +29,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/profiling/timers.h"
grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
@@ -228,8 +229,14 @@ bool grpc_combiner_continue_exec_ctx() {
grpc_core::ExecCtx::Get()->IsReadyToFinish(),
lock->time_to_execute_final_list));
+ // offload only if all the following conditions are true:
+ // 1. the combiner is contended and has more than one closure to execute
+ // 2. the current execution context needs to finish as soon as possible
+ // 3. the DEFAULT executor is threaded
+ // 4. the current thread is not a worker for any background poller
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
- grpc_executor_is_threaded()) {
+ grpc_executor_is_threaded() &&
+ !grpc_iomgr_is_any_background_poller_thread()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on: schedule remaining work to be
// picked up on the executor
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 4b8c891e9b..9eb4c089d8 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding
*/
+static bool is_any_background_poller_thread(void) { return false; }
+
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) {
@@ -1287,6 +1289,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7a4870db78..0a0891013a 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding
*/
+static bool is_any_background_poller_thread(void) { return false; }
+
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) {
@@ -1644,6 +1646,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 67cbfbbd02..c479206410 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() {
* event engine binding
*/
+static bool is_any_background_poller_thread(void) { return false; }
+
static void shutdown_background_closure(void) {}
static void shutdown_engine(void) {
@@ -1828,6 +1830,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 32d1b6c43e..fb2e70eee4 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -399,6 +399,10 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd);
}
+bool grpc_is_any_background_poller_thread(void) {
+ return g_event_engine->is_any_background_poller_thread();
+}
+
void grpc_shutdown_background_closure(void) {
g_event_engine->shutdown_background_closure();
}
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 812c7a0f0f..94ac9fdba6 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -80,6 +80,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
+ bool (*is_any_background_poller_thread)(void);
void (*shutdown_background_closure)(void);
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
@@ -181,6 +182,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
+/* Returns true if the caller is a worker thread for any background poller. */
+bool grpc_is_any_background_poller_thread();
+
/* Shut down all the closures registered in the background poller. */
void grpc_shutdown_background_closure();
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index eb29973514..a492146857 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -161,6 +161,10 @@ void grpc_iomgr_shutdown_background_closure() {
grpc_iomgr_platform_shutdown_background_closure();
}
+bool grpc_iomgr_is_any_background_poller_thread() {
+ return grpc_iomgr_platform_is_any_background_poller_thread();
+}
+
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) {
obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 8ea9289e06..6261aa550c 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -39,6 +39,9 @@ void grpc_iomgr_shutdown();
* background poller. */
void grpc_iomgr_shutdown_background_closure();
+/** Returns true if the caller is a worker thread for any background poller. */
+bool grpc_iomgr_is_any_background_poller_thread();
+
/* Exposed only for testing */
size_t grpc_iomgr_count_objects_for_testing();
diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc
index 4b112c9097..e1cd8f7310 100644
--- a/src/core/lib/iomgr/iomgr_custom.cc
+++ b/src/core/lib/iomgr/iomgr_custom.cc
@@ -41,10 +41,14 @@ static void iomgr_platform_init(void) {
static void iomgr_platform_flush(void) {}
static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
static void iomgr_platform_shutdown_background_closure(void) {}
+static bool iomgr_platform_is_any_background_poller_thread(void) {
+ return false;
+}
static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
- iomgr_platform_shutdown_background_closure};
+ iomgr_platform_shutdown_background_closure,
+ iomgr_platform_is_any_background_poller_thread};
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc
index b6c9211865..e68b1cf581 100644
--- a/src/core/lib/iomgr/iomgr_internal.cc
+++ b/src/core/lib/iomgr/iomgr_internal.cc
@@ -45,3 +45,7 @@ void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); }
void grpc_iomgr_platform_shutdown_background_closure() {
iomgr_platform_vtable->shutdown_background_closure();
}
+
+bool grpc_iomgr_platform_is_any_background_poller_thread() {
+ return iomgr_platform_vtable->is_any_background_poller_thread();
+}
diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h
index bca7409907..2250ad9a18 100644
--- a/src/core/lib/iomgr/iomgr_internal.h
+++ b/src/core/lib/iomgr/iomgr_internal.h
@@ -36,6 +36,7 @@ typedef struct grpc_iomgr_platform_vtable {
void (*flush)(void);
void (*shutdown)(void);
void (*shutdown_background_closure)(void);
+ bool (*is_any_background_poller_thread)(void);
} grpc_iomgr_platform_vtable;
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name);
@@ -56,6 +57,9 @@ void grpc_iomgr_platform_shutdown(void);
/** shut down all the closures registered in the background poller */
void grpc_iomgr_platform_shutdown_background_closure(void);
+/** return true is the caller is a worker thread for any background poller */
+bool grpc_iomgr_platform_is_any_background_poller_thread(void);
+
bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index 9386adf060..278c8de688 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -55,9 +55,14 @@ static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure();
}
+static bool iomgr_platform_is_any_background_poller_thread(void) {
+ return grpc_is_any_background_poller_thread();
+}
+
static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
- iomgr_platform_shutdown_background_closure};
+ iomgr_platform_shutdown_background_closure,
+ iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
index 552ef4309c..462ac41fcd 100644
--- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc
+++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
@@ -58,9 +58,14 @@ static void iomgr_platform_shutdown_background_closure(void) {
grpc_shutdown_background_closure();
}
+static bool iomgr_platform_is_any_background_poller_thread(void) {
+ return grpc_is_any_background_poller_thread();
+}
+
static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
- iomgr_platform_shutdown_background_closure};
+ iomgr_platform_shutdown_background_closure,
+ iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() {
char* enable_cfstream = getenv(grpc_cfstream_env_var);
diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc
index 24ef0dba7b..0579e16aa7 100644
--- a/src/core/lib/iomgr/iomgr_windows.cc
+++ b/src/core/lib/iomgr/iomgr_windows.cc
@@ -73,9 +73,14 @@ static void iomgr_platform_shutdown(void) {
static void iomgr_platform_shutdown_background_closure(void) {}
+static bool iomgr_platform_is_any_background_poller_thread(void) {
+ return false;
+}
+
static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
- iomgr_platform_shutdown_background_closure};
+ iomgr_platform_shutdown_background_closure,
+ iomgr_platform_is_any_background_poller_thread};
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable);
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 83c6edc6e3..e8fae09a1f 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -56,7 +56,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
-@property(atomic) BOOL isWaitingForToken;
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
@@ -425,9 +424,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// queue
dispatch_queue_t _responseQueue;
- // Whether the call is finished. If it is, should not call finishWithError again.
- BOOL _finished;
-
// The OAuth2 token fetched from a token provider.
NSString *_fetchedOauth2AccessToken;
}
@@ -448,24 +444,28 @@ const char *kCFStreamVarName = "grpc_cfstream";
return;
}
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- switch (callSafety) {
- case GRPCCallSafetyDefault:
- callFlags[hostAndPath] = @0;
- break;
- case GRPCCallSafetyIdempotentRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- break;
- case GRPCCallSafetyCacheableRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- break;
- default:
- break;
+ @synchronized(callFlags) {
+ switch (callSafety) {
+ case GRPCCallSafetyDefault:
+ callFlags[hostAndPath] = @0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ default:
+ break;
+ }
}
}
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- return [callFlags[hostAndPath] intValue];
+ @synchronized(callFlags) {
+ return [callFlags[hostAndPath] intValue];
+ }
}
// Designated initializer
@@ -506,7 +506,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
- _callQueue = dispatch_queue_create("io.grpc.call", NULL);
+ _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
_requestWriter = requestWriter;
@@ -523,66 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
- if (_state != GRXWriterStateNotStarted) {
- return;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
}
- _responseQueue = queue;
}
#pragma mark Finish
+// This function should support being called within a @synchronized(self) block in another function
+// Should not manipulate _requestWriter for deadlock prevention.
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
- }
-
- // If there were still request messages coming, stop them.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStateFinished;
- }
-
- if (errorOrNil) {
- [_responseWriteable cancelWithError:errorOrNil];
- } else {
- [_responseWriteable enqueueSuccessfulCompletion];
- }
-
- [GRPCConnectivityMonitor unregisterObserver:self];
- // If the call isn't retained anywhere else, it can be deallocated now.
- _retainSelf = nil;
-}
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
-- (void)cancelCall {
- // Can be called from any thread, any number of times.
- @synchronized(self) {
- [_wrappedCall cancel];
+ // If the call isn't retained anywhere else, it can be deallocated now.
+ _retainSelf = nil;
}
}
- (void)cancel {
@synchronized(self) {
- [self cancelCall];
- self.isWaitingForToken = NO;
- }
- [self
- maybeFinishWithError:[NSError
- errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeCancelled
- userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
-}
-
-- (void)maybeFinishWithError:(NSError *)errorOrNil {
- BOOL toFinish = NO;
- @synchronized(self) {
- if (_finished == NO) {
- _finished = YES;
- toFinish = YES;
+ if (_state == GRXWriterStateFinished) {
+ return;
}
+ [self finishWithError:[NSError
+ errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
+ [_wrappedCall cancel];
}
- if (toFinish == YES) {
- [self finishWithError:errorOrNil];
- }
+ _requestWriter.state = GRXWriterStateFinished;
}
- (void)dealloc {
@@ -609,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@synchronized(self) {
- if (self.state == GRXWriterStatePaused) {
+ if (_state != GRXWriterStateStarted) {
return;
}
}
dispatch_async(_callQueue, ^{
__weak GRPCCall *weakSelf = self;
- __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
- __strong GRPCCall *strongSelf = weakSelf;
- __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
+ NSLog(@"message received");
if (message == NULL) {
// No more messages from the server
return;
}
+ __strong GRPCCall *strongSelf = weakSelf;
+ if (strongSelf == nil) {
+ grpc_byte_buffer_destroy(message);
+ return;
+ }
NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) {
@@ -631,21 +616,26 @@ const char *kCFStreamVarName = "grpc_cfstream";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
- [strongSelf cancelCall];
- [strongSelf
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeResourceExhausted
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Client does not have enough memory to "
- @"hold the server response."
- }]];
- return;
+ @synchronized(strongSelf) {
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeResourceExhausted
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Client does not have enough memory to "
+ @"hold the server response."
+ }]];
+ [strongSelf->_wrappedCall cancel];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
+ } else {
+ @synchronized(strongSelf) {
+ [strongSelf->_responseWriteable enqueueValue:data
+ completionHandler:^{
+ [strongSelf startNextRead];
+ }];
+ }
}
- [strongWriteable enqueueValue:data
- completionHandler:^{
- [strongSelf startNextRead];
- }];
}];
});
}
@@ -684,11 +674,13 @@ const char *kCFStreamVarName = "grpc_cfstream";
initWithMetadata:headers
flags:callSafetyFlags
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
- if (!_unaryCall) {
- [_wrappedCall startBatchWithOperations:@[ op ]];
- } else {
- [_unaryOpBatch addObject:op];
- }
+ dispatch_async(_callQueue, ^{
+ if (!self->_unaryCall) {
+ [self->_wrappedCall startBatchWithOperations:@[ op ]];
+ } else {
+ [self->_unaryOpBatch addObject:op];
+ }
+ });
}
#pragma mark GRXWriteable implementation
@@ -703,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- @synchronized(strongSelf->_requestWriter) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
- }
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
@@ -721,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)writeValue:(id)value {
- // TODO(jcanizales): Throw/assert if value isn't NSData.
+ NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
+
+ @synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+ }
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStatePaused;
- }
+ _requestWriter.state = GRXWriterStatePaused;
dispatch_async(_callQueue, ^{
// Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -766,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
- // TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ dispatch_async(_callQueue, ^{
+ // TODO(jcanizales): Add error handlers for async failures
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ });
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
+ NSLog(@"response received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
@@ -784,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
}
completionHandler:^(NSError *error, NSDictionary *trailers) {
+ NSLog(@"completion received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
@@ -794,112 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream";
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
- // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
- // called before this one, so an error might end up with trailers but no headers. We
- // shouldn't call finishWithError until ater both blocks are called. It is also when
- // this is done that we can provide a merged view of response headers and trailers in a
- // thread-safe way.
- if (strongSelf.responseHeaders) {
- userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
- }
+ // Since gRPC core does not guarantee the headers block being called before this block,
+ // responseHeaders might be nil.
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
- [strongSelf maybeFinishWithError:error];
+ [strongSelf finishWithError:error];
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}];
- // Now that the RPC has been initiated, request writes can start.
- @synchronized(_requestWriter) {
- [_requestWriter startWithWriteable:self];
- }
}
#pragma mark GRXWriter implementation
+// Lock acquired inside startWithWriteable:
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
- _responseWriteable =
- [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
-
- GRPCPooledChannel *channel =
- [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
- GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
- completionQueue:[GRPCCompletionQueue completionQueue]
- callOptions:_callOptions];
-
- if (wrappedCall == nil) {
- [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Failed to create call or channel."
- }]];
- return;
- }
-
@synchronized(self) {
- _wrappedCall = wrappedCall;
- }
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+
+ _responseWriteable =
+ [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
+
+ GRPCPooledChannel *channel =
+ [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ _wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
+
+ if (_wrappedCall == nil) {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Failed to create call or channel."
+ }]];
+ return;
+ }
- [self sendHeaders];
- [self invokeCall];
+ [self sendHeaders];
+ [self invokeCall];
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ }
}
+
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ id<GRPCAuthorizationProtocol> tokenProvider = nil;
@synchronized(self) {
_state = GRXWriterStateStarted;
- }
- // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
- // This makes RPCs in which the call isn't externally retained possible (as long as it is started
- // before being autoreleased).
- // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
- // that the life of the instance is determined by this retain cycle.
- _retainSelf = self;
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is
+ // started before being autoreleased). Care is taken not to retain self strongly in any of the
+ // blocks used in this implementation, so that the life of the instance is determined by this
+ // retain cycle.
+ _retainSelf = self;
+
+ if (_callOptions == nil) {
+ GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
+ if (_serverName.length != 0) {
+ callOptions.serverAuthority = _serverName;
+ }
+ if (_timeout > 0) {
+ callOptions.timeout = _timeout;
+ }
+ uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
+ if (callFlags != 0) {
+ if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ _callSafety = GRPCCallSafetyIdempotentRequest;
+ } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ _callSafety = GRPCCallSafetyCacheableRequest;
+ }
+ }
- if (_callOptions == nil) {
- GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
- if (_serverName.length != 0) {
- callOptions.serverAuthority = _serverName;
- }
- if (_timeout > 0) {
- callOptions.timeout = _timeout;
- }
- uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
- if (callFlags != 0) {
- if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
- _callSafety = GRPCCallSafetyIdempotentRequest;
- } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
- _callSafety = GRPCCallSafetyCacheableRequest;
+ id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
+ if (tokenProvider != nil) {
+ callOptions.authTokenProvider = tokenProvider;
}
+ _callOptions = callOptions;
}
- id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
- if (tokenProvider != nil) {
- callOptions.authTokenProvider = tokenProvider;
- }
- _callOptions = callOptions;
+ NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
+ @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
+
+ tokenProvider = _callOptions.authTokenProvider;
}
- NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
- @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
- if (_callOptions.authTokenProvider != nil) {
- @synchronized(self) {
- self.isWaitingForToken = YES;
- }
- [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) {
- @synchronized(self) {
- if (self.isWaitingForToken) {
- if (token) {
- self->_fetchedOauth2AccessToken = [token copy];
+ if (tokenProvider != nil) {
+ __weak typeof(self) weakSelf = self;
+ [tokenProvider getTokenWithHandler:^(NSString *token) {
+ __strong typeof(self) strongSelf = weakSelf;
+ if (strongSelf) {
+ @synchronized(strongSelf) {
+ if (strongSelf->_state == GRXWriterStateNotStarted) {
+ if (token) {
+ strongSelf->_fetchedOauth2AccessToken = [token copy];
+ }
}
- [self startCallWithWriteable:writeable];
- self.isWaitingForToken = NO;
}
+ [strongSelf startCallWithWriteable:writeable];
}
}];
} else {
@@ -938,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)connectivityChanged:(NSNotification *)note {
- // Cancel underlying call upon this notification
+ // Cancel underlying call upon this notification.
+
+ // Retain because connectivity manager only keeps weak reference to GRPCCall.
__strong GRPCCall *strongSelf = self;
if (strongSelf) {
- [self cancelCall];
- [self
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey : @"Connectivity lost."
- }]];
+ @synchronized(strongSelf) {
+ [_wrappedCall cancel];
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey : @"Connectivity lost."
+ }]];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
index a323f0490c..60a33eda82 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
@@ -236,6 +236,12 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return nil;
}
+ // remove trailing slash of hostname
+ NSURL *hostURL = [NSURL URLWithString:[@"https://" stringByAppendingString:host]];
+ if (hostURL.host && hostURL.port == nil) {
+ host = [hostURL.host stringByAppendingString:@":443"];
+ }
+
GRPCPooledChannel *pooledChannel = nil;
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index a871ea895a..ae08cc315b 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,8 +36,7 @@
* crash. If you want to react to flow control signals to prevent that, instead of using this class
* you can implement an object that conforms to GRXWriter.
*
- * Thread-safety:
- * The methods of an object of this class should not be called concurrently from different threads.
+ * Thread-safety: the methods of this class are thread-safe.
*/
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 546d46cba3..74e2f03da6 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- [weakSelf.writeable writeValue:value];
+ @synchronized(self) {
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self.writeable writeValue:value];
+ }
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:errorOrNil];
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:errorOrNil];
});
}
@@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- self.writeable = writeable;
- _state = GRXWriterStateStarted;
+ @synchronized(self) {
+ self.writeable = writeable;
+ _state = GRXWriterStateStarted;
+ }
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
- self.state = GRXWriterStateFinished;
}
- (void)dealloc {
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index abb831e6fb..5beca9d41e 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -23,10 +23,10 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
- * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
- * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
- * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
- * by the app cancelling the writes), no further messages are sent to the writeable except
+ * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is
+ * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor
+ * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app
+ * cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
* TODO(jcanizales): Let the user specify another queue for the writeable callbacks.
@@ -43,21 +43,22 @@
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
- * Enqueues writeValue: to be sent to the writeable in the main thread.
- * The passed handler is invoked from the main thread after writeValue: returns.
+ * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue.
+ * The passed handler is invoked from designated dispatch queue after writeValue: returns.
*/
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler;
/**
- * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that
- * message is sent to the writeable, all other methods of this object are effectively noops.
+ * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch
+ * queue. After that message is sent to the writeable, all other methods of this object are
+ * effectively noops.
*/
- (void)enqueueSuccessfulCompletion;
/**
* If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one
- * to be sent to it in the main thread, and cancel all other pending messages to the writeable
- * enqueued by this object (both past and future).
+ * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the
+ * writeable enqueued by this object (both past and future).
* The error argument cannot be nil.
*/
- (void)cancelWithError:(NSError *)error;
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..115195463d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -27,8 +27,15 @@
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
- // This ensures that writesFinishedWithError: is only sent once to the writeable.
+
+ // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
+ // by _writeableQueue.
BOOL _alreadyFinished;
+
+ // This ivar ensures that a cancelWithError: call prevents further values to be sent to
+ // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
+ // protected by self lock.
+ BOOL _cancelled;
}
- (instancetype)init {
@@ -41,6 +48,8 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
+ _cancelled = NO;
}
return self;
}
@@ -51,78 +60,63 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
+ }
+
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
+ }
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
+ if (self->_alreadyFinished) {
+ return;
+ }
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
}
}
+ [self.writeable writesFinishedWithError:nil];
+
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
@synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
- }
+ self->_cancelled = YES;
}
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
- self.writeable = nil;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ // a cancel or a successful completion is already issued
+ return;
+ }
+ [self.writeable writesFinishedWithError:error];
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index 3814ff8831..00366b6416 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -25,11 +25,7 @@
* input writer, and for classes that represent objects with input and
* output sequences of values, like an RPC.
*
- * Thread-safety:
- * All messages sent to this object need to be serialized. When it is started, the writer it wraps
- * is started in the same thread. Manual state changes are propagated to the wrapped writer in the
- * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
- * serialized with any message sent to this object.
+ * Thread-safety: the methods of this class are thread safe.
*/
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 3e522ef24e..27ac0acdff 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -54,48 +54,65 @@
[writeable writesFinishedWithError:errorOrNil];
}
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- GRXWriter *writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- [_writeable writeValue:value];
+ @synchronized(self) {
+ [_writeable writeValue:value];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
+ @synchronized(self) {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+ }
}
#pragma mark GRXWriter implementation
- (GRXWriterState)state {
- return _writer ? _writer.state : GRXWriterStateFinished;
+ GRXWriter *copiedWriter;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ return copiedWriter ? copiedWriter.state : GRXWriterStateFinished;
}
- (void)setState:(GRXWriterState)state {
+ GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
+ @synchronized(self) {
+ _writeable = nil;
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
} else {
- _writer.state = state;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ _writeable = writeable;
+ copiedWriter = _writer;
+ }
+ [copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ [self finishOutputWithError:errorOrNil];
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
index 601abdc6b9..2fa38b3dce 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
@@ -23,6 +23,8 @@
/**
* Utility to construct GRXWriter instances from values that are immediately available when
* required.
+ *
+ * Thread safety: the methods of this class are thread safe.
*/
@interface GRXImmediateSingleWriter : GRXImmediateWriter
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
index 3126ae4bd1..079c11b94f 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
@@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
- id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _state = GRXWriterStateStarted;
- _writeable = writeable;
- [writeable writeValue:_value];
- [self finish];
-}
-
-- (void)finish {
- _state = GRXWriterStateFinished;
- _value = nil;
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
+ id copiedValue = nil;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ copiedValue = _value;
+ _value = nil;
+ _state = GRXWriterStateFinished;
+ }
+ [writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
- // Since _value is available when creating the object, we can simply
- // apply the map and store the output.
- _value = map(_value);
+ @synchronized(self) {
+ // Since _value is available when creating the object, we can simply
+ // apply the map and store the output.
+ _value = map(_value);
+ }
return self;
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d99583a92..df4c80c28d 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
- * more details, see the documentation of the enum further down.
+ * more details, see the documentation of the enum further down. The property is thread safe.
*/
-@property(nonatomic) GRXWriterState state;
+@property GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it
diff --git a/templates/test/cpp/naming/resolver_component_tests_defs.include b/templates/test/cpp/naming/resolver_component_tests_defs.include
index b34845e01a..d38316cbe6 100644
--- a/templates/test/cpp/naming/resolver_component_tests_defs.include
+++ b/templates/test/cpp/naming/resolver_component_tests_defs.include
@@ -55,7 +55,6 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
-os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,
diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc
index 571746abe8..6f153cc9bf 100644
--- a/test/core/client_channel/resolvers/dns_resolver_test.cc
+++ b/test/core/client_channel/resolvers/dns_resolver_test.cc
@@ -22,6 +22,8 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "test/core/util/test_config.h"
@@ -72,12 +74,13 @@ int main(int argc, char** argv) {
test_succeeds(dns, "dns:10.2.1.1:1234");
test_succeeds(dns, "dns:www.google.com");
test_succeeds(dns, "dns:///www.google.com");
- if (grpc_resolve_address == grpc_resolve_address_ares) {
- test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");
- } else {
+ char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
+ if (resolver_env != nullptr && gpr_stricmp(resolver_env, "native") == 0) {
test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888");
+ } else {
+ test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");
}
-
+ gpr_free(resolver_env);
{
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc
index f606e89ac8..b201568f48 100644
--- a/test/core/surface/concurrent_connectivity_test.cc
+++ b/test/core/surface/concurrent_connectivity_test.cc
@@ -44,7 +44,7 @@
#define NUM_OUTER_LOOPS 10
#define NUM_INNER_LOOPS 10
#define DELAY_MILLIS 10
-#define POLL_MILLIS 3000
+#define POLL_MILLIS 15000
#define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
#define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index dca97c85b1..7aa197b597 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -94,6 +94,7 @@ static const grpc_event_engine_vtable* init_engine_vtable(bool) {
g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick;
+ g_vtable.is_any_background_poller_thread = [] { return false; };
g_vtable.shutdown_background_closure = [] {};
g_vtable.shutdown_engine = [] {};
diff --git a/test/cpp/naming/resolver_component_tests_runner.py b/test/cpp/naming/resolver_component_tests_runner.py
index 1873eec35b..950a9d4897 100755
--- a/test/cpp/naming/resolver_component_tests_runner.py
+++ b/test/cpp/naming/resolver_component_tests_runner.py
@@ -55,7 +55,6 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
-os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,
diff --git a/third_party/toolchains/BUILD b/third_party/toolchains/BUILD
index e213461acc..a1bee7f1f6 100644
--- a/third_party/toolchains/BUILD
+++ b/third_party/toolchains/BUILD
@@ -20,17 +20,17 @@ package(default_visibility = ["//visibility:public"])
# Update every time when a new container is released.
alias(
name = "rbe_ubuntu1604",
- actual = ":rbe_ubuntu1604_r342117",
+ actual = ":rbe_ubuntu1604_r346485",
)
alias(
name = "rbe_ubuntu1604_large",
- actual = ":rbe_ubuntu1604_r342117_large",
+ actual = ":rbe_ubuntu1604_r346485_large",
)
-# RBE Ubuntu16_04 r342117
+# RBE Ubuntu16_04 r346485
platform(
- name = "rbe_ubuntu1604_r342117",
+ name = "rbe_ubuntu1604_r346485",
constraint_values = [
"@bazel_tools//platforms:x86_64",
"@bazel_tools//platforms:linux",
@@ -51,9 +51,9 @@ platform(
""",
)
-# RBE Ubuntu16_04 r342117 large
+# RBE Ubuntu16_04 r346485 large
platform(
- name = "rbe_ubuntu1604_r342117_large",
+ name = "rbe_ubuntu1604_r346485_large",
constraint_values = [
"@bazel_tools//platforms:x86_64",
"@bazel_tools//platforms:linux",
@@ -74,14 +74,12 @@ platform(
""",
)
-# This target is auto-generated from release/cpp.tpl and should not be
-# modified directly.
toolchain(
name = "cc-toolchain-clang-x86_64-default",
exec_compatible_with = [
],
target_compatible_with = [
],
- toolchain = "@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.16.1/default:cc-compiler-k8",
+ toolchain = "@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.20.0/default:cc-compiler-k8",
toolchain_type = "@bazel_tools//tools/cpp:toolchain_type",
)
diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
index 632db5ae14..5b6b256939 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
@@ -38,6 +38,8 @@ if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then
fi
set +ex # rvm script is very verbose and exits with errorcode
+# Advice from https://github.com/Homebrew/homebrew-cask/issues/8629#issuecomment-68641176
+brew update && brew upgrade brew-cask && brew cleanup && brew cask cleanup
source $HOME/.rvm/scripts/rvm
set -e # rvm commands are very verbose
time rvm install 2.5.0
diff --git a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
index 4d7d4271d6..863b43a172 100755
--- a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
+++ b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
@@ -23,7 +23,7 @@ cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321
# Download bazel
temp_dir="$(mktemp -d)"
-wget -q https://github.com/bazelbuild/bazel/releases/download/0.17.1/bazel-0.17.1-linux-x86_64 -O "${temp_dir}/bazel"
+wget -q https://github.com/bazelbuild/bazel/releases/download/0.20.0/bazel-0.20.0-linux-x86_64 -O "${temp_dir}/bazel"
chmod 755 "${temp_dir}/bazel"
export PATH="${temp_dir}:${PATH}"
# This should show ${temp_dir}/bazel
diff --git a/tools/interop_matrix/README.md b/tools/interop_matrix/README.md
index 6676f5d470..ecd71be7f8 100644
--- a/tools/interop_matrix/README.md
+++ b/tools/interop_matrix/README.md
@@ -11,8 +11,9 @@ We have continuous nightly test setup to test gRPC backward compatibility betwee
- Build new client docker image(s). For example, for C and wrapper languages release `v1.9.9`, do
- `tools/interop_matrix/create_matrix_images.py --git_checkout --release=v1.9.9 --upload_images --language cxx csharp python ruby php`
- Verify that the new docker image was built successfully and uploaded to GCR. For example,
- - `gcloud beta container images list --repository gcr.io/grpc-testing` shows image repos.
- - `gcloud beta container images list-tags gcr.io/grpc-testing/grpc_interop_java_oracle8` should show an image entry with tag `v1.9.9`.
+ - `gcloud container images list --repository gcr.io/grpc-testing` lists available images.
+ - `gcloud container images list-tags gcr.io/grpc-testing/grpc_interop_java_oracle8` should show an image entry with tag `v1.9.9`.
+ - images can also be viewed in https://pantheon.corp.google.com/gcr/images/grpc-testing?project=grpc-testing
- Verify the just-created docker client image would pass backward compatibility test (it should). For example,
- `gcloud docker -- pull gcr.io/grpc-testing/grpc_interop_java_oracle8:v1.9.9` followed by
- `docker_image=gcr.io/grpc-testing/grpc_interop_java_oracle8:v1.9.9 tools/interop_matrix/testcases/java__master`
@@ -20,13 +21,11 @@ We have continuous nightly test setup to test gRPC backward compatibility betwee
- (Optional) clean up the tmp directory to where grpc source is cloned at `/export/hda3/tmp/grpc_matrix/`.
For more details on each step, refer to sections below.
-## Instructions for adding new language/runtimes*
+## Instructions for adding new language/runtimes
- Create new `Dockerfile.template`, `build_interop.sh.template` for the language/runtime under `template/tools/dockerfile/`.
- Run `tools/buildgen/generate_projects.sh` to create corresponding files under `tools/dockerfile/`.
- Add language/runtimes to `client_matrix.py` following existing language/runtimes examples.
-- Run `tools/interop_matrix/create_matrix_images.py` which will build and upload images to GCR. Unless you are also building images for a gRPC release, make sure not to set `--release` (the default release 'master' is used for testing).
-
-*: Please delete your docker images at https://pantheon.corp.google.com/gcr/images/grpc-testing?project=grpc-testing afterwards. Permissions to access GrpcTesting project is required for this step.
+- Run `tools/interop_matrix/create_matrix_images.py` which will build (and upload) images to GCR.
## Instructions for creating new test cases
- Create test cases by running `LANG=<lang> [RELEASE=<release>] ./create_testcases.sh`. For example,
@@ -39,13 +38,13 @@ For more details on each step, refer to sections below.
- `--release` specifies a git release tag. Defaults to `--release=all`. Make sure the GCR images with the tag have been created using `create_matrix_images.py` above.
- `--language` specifies a language. Defaults to `--language=all`.
For example, To test all languages for all gRPC releases across all runtimes, do `tools/interop_matrix/run_interop_matrix_test.py --release=all`.
-- The output for all the test cases is recorded in a junit style xml file (default to 'report.xml').
+- The output for all the test cases is recorded in a junit style xml file (defaults to 'report.xml').
## Instructions for running test cases against a GCR image manually
-- Download docker image from GCR. For example: `gcloud docker -- pull gcr.io/grpc-testing/grpc_interop_go1.7:master`.
+- Download docker image from GCR. For example: `gcloud docker -- pull gcr.io/grpc-testing/grpc_interop_go1.8:v1.16.0`.
- Run test cases by specifying `docker_image` variable inline with the test case script created above.
For example:
- - `docker_image=gcr.io/grpc-testing/grpc_interop_go1.7:master ./testcases/go__master` will run go__master test cases against `go1.7` with gRPC release `master` docker image in GCR.
+ - `docker_image=gcr.io/grpc-testing/grpc_interop_go1.8:v1.16.0 ./testcases/go__master` will run go__master test cases against `go1.8` with gRPC release `v1.16.0` docker image in GCR.
Note:
- File path starting with `tools/` or `template/` are relative to the grpc repo root dir. File path starting with `./` are relative to current directory (`tools/interop_matrix`).
diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py
index 12051e70a0..655c7c7b6b 100644
--- a/tools/interop_matrix/client_matrix.py
+++ b/tools/interop_matrix/client_matrix.py
@@ -35,14 +35,15 @@ def get_release_tags(lang):
def get_runtimes_for_lang_release(lang, release):
"""Get list of valid runtimes for given release of lang."""
- runtimes_to_skip = []
- release_info = LANG_RELEASE_MATRIX[lang][release]
- if release_info:
- runtimes_to_skip = release_info.skip_runtime
- return [
- runtime for runtime in LANG_RUNTIME_MATRIX[lang]
- if runtime not in runtimes_to_skip
- ]
+ runtimes = list(LANG_RUNTIME_MATRIX[lang])
+ release_info = LANG_RELEASE_MATRIX[lang].get(release)
+ if release_info and release_info.runtime_subset:
+ runtimes = list(release_info.runtime_subset)
+
+ # check that all selected runtimes are valid for given language
+ for runtime in runtimes:
+ assert runtime in LANG_RUNTIME_MATRIX[lang]
+ return runtimes
def should_build_docker_interop_image_from_release_tag(lang):
@@ -57,7 +58,7 @@ def should_build_docker_interop_image_from_release_tag(lang):
# Dictionary of runtimes per language
LANG_RUNTIME_MATRIX = {
'cxx': ['cxx'], # This is actually debian8.
- 'go': ['go1.7', 'go1.8', 'go1.11'],
+ 'go': ['go1.8', 'go1.11'],
'java': ['java_oracle8'],
'python': ['python'],
'node': ['node'],
@@ -70,9 +71,9 @@ LANG_RUNTIME_MATRIX = {
class ReleaseInfo:
"""Info about a single release of a language"""
- def __init__(self, patch=[], skip_runtime=[], testcases_file=None):
+ def __init__(self, patch=[], runtime_subset=[], testcases_file=None):
self.patch = patch
- self.skip_runtime = skip_runtime
+ self.runtime_subset = runtime_subset
self.testcases_file = None
@@ -100,23 +101,23 @@ LANG_RELEASE_MATRIX = {
]),
'go':
OrderedDict([
- ('v1.0.5', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.2.1', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.3.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.4.2', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.5.2', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.6.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.7.4', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.8.2', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.9.2', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.10.1', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.11.3', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.12.2', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.13.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.14.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.15.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.16.0', ReleaseInfo(skip_runtime=['go1.11'])),
- ('v1.17.0', ReleaseInfo(skip_runtime=['go1.7', 'go1.8'])),
+ ('v1.0.5', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.2.1', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.3.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.4.2', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.5.2', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.6.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.7.4', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.8.2', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.9.2', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.10.1', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.11.3', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.12.2', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.13.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.14.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.15.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.16.0', ReleaseInfo(runtime_subset=['go1.8'])),
+ ('v1.17.0', ReleaseInfo(runtime_subset=['go1.11'])),
]),
'java':
OrderedDict([
diff --git a/tools/interop_matrix/create_matrix_images.py b/tools/interop_matrix/create_matrix_images.py
index 31a0e1c7ba..28dc4be0f4 100755
--- a/tools/interop_matrix/create_matrix_images.py
+++ b/tools/interop_matrix/create_matrix_images.py
@@ -260,7 +260,7 @@ atexit.register(cleanup)
def maybe_apply_patches_on_git_tag(stack_base, lang, release):
files_to_patch = []
- release_info = client_matrix.LANG_RELEASE_MATRIX[lang][release]
+ release_info = client_matrix.LANG_RELEASE_MATRIX[lang].get(release)
if release_info:
files_to_patch = release_info.patch
if not files_to_patch:
diff --git a/tools/remote_build/rbe_common.bazelrc b/tools/remote_build/rbe_common.bazelrc
index aa3ddb050c..8cf17a3086 100644
--- a/tools/remote_build/rbe_common.bazelrc
+++ b/tools/remote_build/rbe_common.bazelrc
@@ -18,7 +18,7 @@
startup --host_jvm_args=-Dbazel.DigestFunction=SHA256
-build --crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.16.1/default:toolchain
+build --crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.20.0/default:toolchain
build --extra_toolchains=//third_party/toolchains:cc-toolchain-clang-x86_64-default
# Use custom execution platforms defined in third_party/toolchains
build --extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604,//third_party/toolchains:rbe_ubuntu1604_large
@@ -61,9 +61,9 @@ build:msan --cxxopt=--stdlib=libc++
# setting LD_LIBRARY_PATH is necessary
# to avoid "libc++.so.1: cannot open shared object file"
build:msan --action_env=LD_LIBRARY_PATH=/usr/local/lib
-build:msan --host_crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.16.1/default:toolchain
+build:msan --host_crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.20.0/default:toolchain
# override the config-agnostic crosstool_top
-build:msan --crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.16.1/msan:toolchain
+build:msan --crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.1/bazel_0.20.0/msan:toolchain
# thread sanitizer: most settings are already in %workspace%/.bazelrc
# we only need a few additional ones that are Foundry specific
@@ -79,7 +79,7 @@ build:ubsan --copt=-gmlt
# TODO(jtattermusch): use more reasonable test timeout
build:ubsan --test_timeout=3600
# override the config-agnostic crosstool_top
---crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.1/bazel_0.16.1/ubsan:toolchain
+--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.1/bazel_0.20.0/ubsan:toolchain
# TODO(jtattermusch): remove this once Foundry adds the env to the docker image.
# ubsan needs symbolizer to work properly, otherwise the suppression file doesn't work
# and we get test failures.