aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-01-11 10:24:15 +0000
committerGravatar Muxi Yan <mxyan@google.com>2017-01-11 10:24:15 +0000
commite91344a36f5647c877474eb875471d814cc6f03f (patch)
tree47050dd2b9eed0eafd397dd0eefbc3797453608a /src/cpp
parent123d0dbd614f6d6626295dec93764549e0e650ff (diff)
parent88a352cd0b4236b5e7483adb2ff2c68582d7cfbb (diff)
Merge remote-tracking branch 'upstream/master' into advance-cronet-version
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel_cc.cc30
-rw-r--r--src/cpp/common/channel_arguments.cc12
-rw-r--r--src/cpp/common/channel_filter.cc8
-rw-r--r--src/cpp/common/channel_filter.h48
-rw-r--r--src/cpp/common/completion_queue_cc.cc14
-rw-r--r--src/cpp/server/server_cc.cc17
6 files changed, 91 insertions, 38 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 5f1d00d2b4..357d8317ad 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -48,6 +48,7 @@
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/profiling/timers.h"
@@ -61,6 +62,35 @@ Channel::Channel(const grpc::string& host, grpc_channel* channel)
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+namespace {
+
+grpc::string GetChannelInfoField(grpc_channel* channel,
+ grpc_channel_info* channel_info,
+ char*** channel_info_field) {
+ char* value = NULL;
+ memset(channel_info, 0, sizeof(*channel_info));
+ *channel_info_field = &value;
+ grpc_channel_get_info(channel, channel_info);
+ if (value == NULL) return "";
+ grpc::string result = value;
+ gpr_free(value);
+ return result;
+}
+
+} // namespace
+
+grpc::string Channel::GetLoadBalancingPolicyName() const {
+ grpc_channel_info channel_info;
+ return GetChannelInfoField(c_channel_, &channel_info,
+ &channel_info.lb_policy_name);
+}
+
+grpc::string Channel::GetServiceConfigJSON() const {
+ grpc_channel_info channel_info;
+ return GetChannelInfoField(c_channel_, &channel_info,
+ &channel_info.service_config_json);
+}
+
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index 0067d6c5e1..1fdd106130 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -38,8 +38,11 @@
#include <grpc++/resource_quota.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
+extern "C" {
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/socket_mutator.h"
+}
namespace grpc {
ChannelArguments::ChannelArguments() {
@@ -94,13 +97,15 @@ void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
}
grpc_arg mutator_arg = grpc_socket_mutator_to_arg(mutator);
bool replaced = false;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (auto it = args_.begin(); it != args_.end(); ++it) {
if (it->type == mutator_arg.type &&
grpc::string(it->key) == grpc::string(mutator_arg.key)) {
- it->value.pointer.vtable->destroy(it->value.pointer.p);
+ it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p);
it->value.pointer = mutator_arg.value.pointer;
}
}
+ grpc_exec_ctx_finish(&exec_ctx);
if (!replaced) {
args_.push_back(mutator_arg);
}
@@ -143,6 +148,11 @@ void ChannelArguments::SetLoadBalancingPolicyName(
SetString(GRPC_ARG_LB_POLICY_NAME, lb_policy_name);
}
+void ChannelArguments::SetServiceConfigJSON(
+ const grpc::string& service_config_json) {
+ SetString(GRPC_ARG_SERVICE_CONFIG, service_config_json);
+}
+
void ChannelArguments::SetInt(const grpc::string& key, int value) {
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc
index ad2c0f2295..c0dc9dd63e 100644
--- a/src/cpp/common/channel_filter.cc
+++ b/src/cpp/common/channel_filter.cc
@@ -40,11 +40,12 @@ namespace grpc {
// MetadataBatch
-grpc_linked_mdelem *MetadataBatch::AddMetadata(const string &key,
+grpc_linked_mdelem *MetadataBatch::AddMetadata(grpc_exec_ctx *exec_ctx,
+ const string &key,
const string &value) {
grpc_linked_mdelem *storage = new grpc_linked_mdelem;
memset(storage, 0, sizeof(grpc_linked_mdelem));
- storage->md = grpc_mdelem_from_strings(key.c_str(), value.c_str());
+ storage->md = grpc_mdelem_from_strings(exec_ctx, key.c_str(), value.c_str());
grpc_metadata_batch_link_head(batch_, storage);
return storage;
}
@@ -89,7 +90,8 @@ std::vector<FilterRecord> *channel_filters;
namespace {
-bool MaybeAddFilter(grpc_channel_stack_builder *builder, void *arg) {
+bool MaybeAddFilter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder, void *arg) {
const FilterRecord &filter = *(FilterRecord *)arg;
if (filter.include_filter) {
const grpc_channel_args *args =
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index e420efc71c..5de8f5e463 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -70,7 +70,8 @@ class MetadataBatch {
/// Adds metadata and returns the newly allocated storage.
/// The caller takes ownership of the result, which must exist for the
/// lifetime of the gRPC call.
- grpc_linked_mdelem *AddMetadata(const string &key, const string &value);
+ grpc_linked_mdelem *AddMetadata(grpc_exec_ctx *exec_ctx, const string &key,
+ const string &value);
class const_iterator : public std::iterator<std::bidirectional_iterator_tag,
const grpc_mdelem> {
@@ -216,12 +217,13 @@ class TransportStreamOp {
/// Represents channel data.
class ChannelData {
public:
- virtual ~ChannelData() {
- if (peer_) gpr_free((void *)peer_);
- }
+ virtual ~ChannelData() {}
- /// Caller does NOT take ownership of result.
- const char *peer() const { return peer_; }
+ /// Initializes the call data.
+ virtual grpc_error *Init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element_args *args) {
+ return GRPC_ERROR_NONE;
+ }
// TODO(roth): Find a way to avoid passing elem into these methods.
@@ -232,11 +234,7 @@ class ChannelData {
const grpc_channel_info *channel_info);
protected:
- /// Takes ownership of \a peer.
- ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {}
-
- private:
- const char *peer_;
+ ChannelData() {}
};
/// Represents call data.
@@ -245,7 +243,10 @@ class CallData {
virtual ~CallData() {}
/// Initializes the call data.
- virtual grpc_error *Init() { return GRPC_ERROR_NONE; }
+ virtual grpc_error *Init(grpc_exec_ctx *exec_ctx, ChannelData *channel_data,
+ grpc_call_element_args *args) {
+ return GRPC_ERROR_NONE;
+ }
// TODO(roth): Find a way to avoid passing elem into these methods.
@@ -263,7 +264,7 @@ class CallData {
virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
protected:
- explicit CallData(const ChannelData &) {}
+ CallData() {}
};
namespace internal {
@@ -276,15 +277,11 @@ class ChannelFilter final {
public:
static const size_t channel_data_size = sizeof(ChannelDataType);
- static void InitChannelElement(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- const char *peer =
- args->optional_transport
- ? grpc_transport_get_peer(exec_ctx, args->optional_transport)
- : nullptr;
- // Construct the object in the already-allocated memory.
- new (elem->channel_data) ChannelDataType(*args->channel_args, peer);
+ static grpc_error *InitChannelElement(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ ChannelDataType *channel_data = new (elem->channel_data) ChannelDataType();
+ return channel_data->Init(exec_ctx, args);
}
static void DestroyChannelElement(grpc_exec_ctx *exec_ctx,
@@ -312,11 +309,10 @@ class ChannelFilter final {
static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
- const ChannelDataType &channel_data =
- *(ChannelDataType *)elem->channel_data;
+ ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data;
// Construct the object in the already-allocated memory.
- CallDataType *call_data = new (elem->call_data) CallDataType(channel_data);
- return call_data->Init();
+ CallDataType *call_data = new (elem->call_data) CallDataType();
+ return call_data->Init(exec_ctx, channel_data, args);
}
static void DestroyCallElement(grpc_exec_ctx *exec_ctx,
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index 00cc102f92..0408a41085 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -43,11 +43,21 @@ namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
+CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {
+ InitialAvalanching();
+}
void CompletionQueue::Shutdown() {
g_gli_initializer.summon();
- grpc_completion_queue_shutdown(cq_);
+ CompleteAvalanching();
+}
+
+void CompletionQueue::CompleteAvalanching() {
+ // Check if this was the last avalanching operation
+ if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
+ static_cast<gpr_atm>(-1)) == 1) {
+ grpc_completion_queue_shutdown(cq_);
+ }
}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b7cfd6dbf1..817d85a81c 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -510,12 +510,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
- // Shutdown all ThreadManagers. This will try to gracefully stop all the
- // threads in the ThreadManagers (once they process any inflight requests)
- for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
- (*it)->Shutdown(); // ThreadManager's Shutdown()
- }
-
shutdown_cq.Shutdown();
void* tag;
@@ -531,6 +525,12 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
+ // Shutdown all ThreadManagers. This will try to gracefully stop all the
+ // threads in the ThreadManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Shutdown(); // ThreadManager's Shutdown()
+ }
+
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
@@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
+ call_cq_->RegisterAvalanching(); // This op will trigger more ops
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
+ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
+ call_cq_->CompleteAvalanching();
+}
+
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
if (*status) {