aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-09-22 15:24:44 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-09-22 15:24:44 -0700
commitf7d8860f4ff688e966159f31e2c281a22e2aa3f4 (patch)
tree9367583b04c0e9464c6f091d699dfc64ba63a7f8 /src/cpp
parent76e0c1ddd536342bdc5059209da38d5406c7e717 (diff)
parent31c66c576ad00504b34182340f8ff21bc3f447fb (diff)
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-eos
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel_cc.cc202
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--src/cpp/client/generic_stub.cc33
-rw-r--r--src/cpp/common/channel_filter.cc6
-rw-r--r--src/cpp/common/channel_filter.h22
-rw-r--r--src/cpp/common/core_codegen.cc4
-rw-r--r--src/cpp/server/health/default_health_check_service.cc1
-rw-r--r--src/cpp/server/server_cc.cc9
-rw-r--r--src/cpp/server/server_context.cc2
-rw-r--r--src/cpp/util/byte_buffer_cc.cc31
-rw-r--r--src/cpp/util/core_stats.cc90
-rw-r--r--src/cpp/util/core_stats.h35
-rw-r--r--src/cpp/util/slice_cc.cc2
13 files changed, 386 insertions, 53 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index f2d9bb07c9..19a25c838f 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -18,7 +18,10 @@
#include <grpc++/channel.h>
+#include <chrono>
+#include <condition_variable>
#include <memory>
+#include <mutex>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -35,17 +38,197 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
namespace grpc {
+namespace {
+int kConnectivityCheckIntervalMsec = 500;
+void WatchStateChange(void* arg);
+
+class TagSaver final : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* status) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
+// Constantly watches channel connectivity status to reconnect a transiently
+// disconnected channel. This is a temporary work-around before we have retry
+// support.
+class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
+ public:
+ static void StartWatching(grpc_channel* channel) {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_ == nullptr) {
+ g_watcher_ = new ChannelConnectivityWatcher();
+ }
+ g_watcher_->StartWatchingLocked(channel);
+ }
+ }
+
+ static void StopWatching() {
+ if (!IsDisabled()) {
+ std::unique_lock<std::mutex> lock(g_watcher_mu_);
+ if (g_watcher_->StopWatchingLocked()) {
+ delete g_watcher_;
+ g_watcher_ = nullptr;
+ }
+ }
+ }
+
+ private:
+ ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
+ gpr_ref_init(&ref_, 0);
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
+ }
+
+ static bool IsDisabled() {
+ char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
+ bool disabled = gpr_is_true(env);
+ gpr_free(env);
+ return disabled;
+ }
+
+ void WatchStateChangeImpl() {
+ bool ok = false;
+ void* tag = NULL;
+ CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ if (shutdown_) {
+ // Drain cq_ if the watcher is shutting down
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
+ } else {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ // Make sure we've seen 2 TIMEOUTs before going to sleep
+ if (status == CompletionQueue::TIMEOUT) {
+ status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
+ if (status == CompletionQueue::TIMEOUT) {
+ shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
+ kConnectivityCheckIntervalMsec));
+ continue;
+ }
+ }
+ }
+ }
+ ChannelState* channel_state = static_cast<ChannelState*>(tag);
+ channel_state->state =
+ grpc_channel_check_connectivity_state(channel_state->channel, false);
+ if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
+ void* shutdown_tag = NULL;
+ channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
+ delete channel_state;
+ if (gpr_unref(&ref_)) {
+ break;
+ }
+ } else {
+ TagSaver* tag_saver = new TagSaver(channel_state);
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
+ }
+ }
+ }
+
+ void StartWatchingLocked(grpc_channel* channel) {
+ if (thd_id_ != 0) {
+ gpr_ref(&ref_);
+ ++channel_count_;
+ ChannelState* channel_state = new ChannelState(channel);
+ // The first grpc_channel_watch_connectivity_state() is not used to
+ // monitor the channel state change, but to hold a reference of the
+ // c channel. So that WatchStateChangeImpl() can observe state ==
+ // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
+ new TagSaver(nullptr));
+ grpc_channel_watch_connectivity_state(
+ channel_state->channel, channel_state->state,
+ gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
+ new TagSaver(channel_state));
+ }
+ }
+
+ bool StopWatchingLocked() {
+ if (--channel_count_ == 0) {
+ {
+ std::unique_lock<std::mutex> lock(shutdown_mu_);
+ shutdown_ = true;
+ shutdown_cv_.notify_one();
+ }
+ gpr_thd_join(thd_id_);
+ return true;
+ }
+ return false;
+ }
+
+ friend void WatchStateChange(void* arg);
+ struct ChannelState {
+ explicit ChannelState(grpc_channel* channel)
+ : channel(channel), state(GRPC_CHANNEL_IDLE){};
+ grpc_channel* channel;
+ grpc_connectivity_state state;
+ CompletionQueue shutdown_cq;
+ };
+ gpr_thd_id thd_id_;
+ CompletionQueue cq_;
+ gpr_refcount ref_;
+ int channel_count_;
+
+ std::mutex shutdown_mu_;
+ std::condition_variable shutdown_cv_; // protected by shutdown_mu_
+ bool shutdown_; // protected by shutdown_mu_
+
+ static std::mutex g_watcher_mu_;
+ static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
+};
+
+std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
+ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
+
+void WatchStateChange(void* arg) {
+ ChannelConnectivityWatcher* watcher =
+ static_cast<ChannelConnectivityWatcher*>(arg);
+ watcher->WatchStateChangeImpl();
+}
+} // namespace
+
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
+ if (grpc_channel_support_connectivity_watcher(channel)) {
+ ChannelConnectivityWatcher::StartWatching(channel);
+ }
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ const bool stop_watching =
+ grpc_channel_support_connectivity_watcher(c_channel_);
+ grpc_channel_destroy(c_channel_);
+ if (stop_watching) {
+ ChannelConnectivityWatcher::StopWatching();
+ }
+}
namespace {
@@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
-namespace {
-class TagSaver final : public CompletionQueueTag {
- public:
- explicit TagSaver(void* tag) : tag_(tag) {}
- ~TagSaver() override {}
- bool FinalizeResult(void** tag, bool* status) override {
- *tag = tag_;
- delete this;
- return true;
- }
-
- private:
- void* tag_;
-};
-
-} // namespace
-
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 3af8bdc11a..40e95f3c05 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -96,7 +96,7 @@ void ClientContext::set_call(grpc_call* call,
void ClientContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
- char* algorithm_name = nullptr;
+ const char* algorithm_name = nullptr;
if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
algorithm);
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 66b1ef0e39..693b8bea56 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -22,14 +22,39 @@
namespace grpc {
+namespace {
+std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal(
+ ChannelInterface* channel, ClientContext* context,
+ const grpc::string& method, CompletionQueue* cq, bool start, void* tag) {
+ return std::unique_ptr<GenericClientAsyncReaderWriter>(
+ GenericClientAsyncReaderWriter::Create(
+ channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING),
+ context, start, tag));
+}
+
+} // namespace
+
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag) {
- return std::unique_ptr<GenericClientAsyncReaderWriter>(
- GenericClientAsyncReaderWriter::Create(
- channel_.get(), cq,
- RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
+ return CallInternal(channel_.get(), context, method, cq, true, tag);
+}
+
+// setup a call to a named method
+std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::PrepareCall(
+ ClientContext* context, const grpc::string& method, CompletionQueue* cq) {
+ return CallInternal(channel_.get(), context, method, cq, false, nullptr);
+}
+
+// setup a unary call to a named method
+std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
+ ClientContext* context, const grpc::string& method,
+ const ByteBuffer& request, CompletionQueue* cq) {
+ return std::unique_ptr<GenericClientAsyncResponseReader>(
+ GenericClientAsyncResponseReader::Create(
+ channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC),
+ context, request, false));
}
} // namespace grpc
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc
index f870af0c67..ea44cff832 100644
--- a/src/cpp/common/channel_filter.cc
+++ b/src/cpp/common/channel_filter.cc
@@ -18,7 +18,9 @@
#include <string.h>
+extern "C" {
#include "src/core/lib/channel/channel_stack.h"
+}
#include "src/cpp/common/channel_filter.h"
#include <grpc++/impl/codegen/slice.h>
@@ -66,10 +68,6 @@ void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent);
}
-char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return grpc_call_next_get_peer(exec_ctx, elem);
-}
-
// internal code used by RegisterChannelFilter()
namespace internal {
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index 5d629f7c14..c1aeb3f724 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -26,9 +26,11 @@
#include <functional>
#include <vector>
+extern "C" {
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/metadata_batch.h"
+}
/// An interface to define filters.
///
@@ -193,6 +195,15 @@ class TransportStreamOpBatch {
op_->payload->send_message.send_message = send_message;
}
+ grpc_byte_stream **recv_message() const {
+ return op_->recv_message ? op_->payload->recv_message.recv_message
+ : nullptr;
+ }
+ void set_recv_message(grpc_byte_stream **recv_message) {
+ op_->recv_message = true;
+ op_->payload->recv_message.recv_message = recv_message;
+ }
+
census_context *get_census_context() const {
return (census_context *)op_->payload->context[GRPC_CONTEXT_TRACING].value;
}
@@ -257,9 +268,6 @@ class CallData {
virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
-
- /// Gets the peer name.
- virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
};
namespace internal {
@@ -338,11 +346,6 @@ class ChannelFilter final {
CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data);
call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent);
}
-
- static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data);
- return call_data->GetPeer(exec_ctx, elem);
- }
};
struct FilterRecord {
@@ -385,8 +388,7 @@ void RegisterChannelFilter(
FilterType::call_data_size, FilterType::InitCallElement,
FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement,
FilterType::channel_data_size, FilterType::InitChannelElement,
- FilterType::DestroyChannelElement, FilterType::GetPeer,
- FilterType::GetChannelInfo, name}};
+ FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}};
internal::channel_filters->push_back(filter_record);
}
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index c7c6b6b13b..6ea5f1d3c7 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu,
void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); }
void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); }
+grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
+ return ::grpc_byte_buffer_copy(bb);
+}
+
void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 815b607032..d2cba6d662 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -20,6 +20,7 @@
#include <mutex>
#include <grpc++/impl/codegen/method_handler_impl.h>
+#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 2483300cb1..6bd3ecda32 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -17,6 +17,7 @@
#include <grpc++/server.h>
+#include <cstdlib>
#include <sstream>
#include <utility>
@@ -38,6 +39,7 @@
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/call.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
@@ -607,7 +609,12 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
- GPR_ASSERT(GRPC_CALL_OK == result);
+ if (result != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
+ grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
+ call->call(), cops, nops, ops);
+ abort();
+ }
}
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 4913682f1d..d7876a000b 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -190,7 +190,7 @@ bool ServerContext::IsCancelled() const {
void ServerContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
- char* algorithm_name = NULL;
+ const char* algorithm_name = NULL;
if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
algorithm);
diff --git a/src/cpp/util/byte_buffer_cc.cc b/src/cpp/util/byte_buffer_cc.cc
index b1ff25252a..180c813762 100644
--- a/src/cpp/util/byte_buffer_cc.cc
+++ b/src/cpp/util/byte_buffer_cc.cc
@@ -16,11 +16,15 @@
*
*/
+#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/byte_buffer.h>
+#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
+
ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) {
// The following assertions check that the representation of a grpc::Slice is
// identical to that of a grpc_slice: it has a grpc_slice field, and nothing
@@ -29,6 +33,16 @@ ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) {
"Slice must have same representation as grpc_slice");
static_assert(sizeof(Slice) == sizeof(grpc_slice),
"Slice must have same representation as grpc_slice");
+ // The following assertions check that the representation of a ByteBuffer is
+ // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field,
+ // and nothing else.
+ static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value,
+ "ByteBuffer must have same representation as "
+ "grpc_byte_buffer*");
+ static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*),
+ "ByteBuffer must have same representation as "
+ "grpc_byte_buffer*");
+ g_gli_initializer.summon(); // Make sure that initializer linked in
// The const_cast is legal if grpc_raw_byte_buffer_create() does no more
// than its advertised side effect of increasing the reference count of the
// slices it processes, and such an increase does not affect the semantics
@@ -37,19 +51,6 @@ ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) {
reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices);
}
-ByteBuffer::~ByteBuffer() {
- if (buffer_) {
- grpc_byte_buffer_destroy(buffer_);
- }
-}
-
-void ByteBuffer::Clear() {
- if (buffer_) {
- grpc_byte_buffer_destroy(buffer_);
- buffer_ = nullptr;
- }
-}
-
Status ByteBuffer::Dump(std::vector<Slice>* slices) const {
slices->clear();
if (!buffer_) {
@@ -80,7 +81,9 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf)
: buffer_(grpc_byte_buffer_copy(buf.buffer_)) {}
ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
- Clear(); // first remove existing data
+ if (this != &buf) {
+ Clear(); // first remove existing data
+ }
if (buf.buffer_) {
buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
}
diff --git a/src/cpp/util/core_stats.cc b/src/cpp/util/core_stats.cc
new file mode 100644
index 0000000000..edf0b1bb67
--- /dev/null
+++ b/src/cpp/util/core_stats.cc
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/cpp/util/core_stats.h"
+
+#include <grpc/support/log.h>
+
+using grpc::core::Bucket;
+using grpc::core::Histogram;
+using grpc::core::Metric;
+using grpc::core::Stats;
+
+namespace grpc {
+
+void CoreStatsToProto(const grpc_stats_data& core, Stats* proto) {
+ for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ Metric* m = proto->add_metrics();
+ m->set_name(grpc_stats_counter_name[i]);
+ m->set_count(core.counters[i]);
+ }
+ for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) {
+ Metric* m = proto->add_metrics();
+ m->set_name(grpc_stats_histogram_name[i]);
+ Histogram* h = m->mutable_histogram();
+ for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) {
+ Bucket* b = h->add_buckets();
+ b->set_start(grpc_stats_histo_bucket_boundaries[i][j]);
+ b->set_count(core.histograms[grpc_stats_histo_start[i] + j]);
+ }
+ }
+}
+
+void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core) {
+ memset(core, 0, sizeof(*core));
+ for (const auto& m : proto.metrics()) {
+ switch (m.value_case()) {
+ case Metric::VALUE_NOT_SET:
+ break;
+ case Metric::kCount:
+ for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ if (m.name() == grpc_stats_counter_name[i]) {
+ core->counters[i] = m.count();
+ break;
+ }
+ }
+ break;
+ case Metric::kHistogram:
+ for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) {
+ if (m.name() == grpc_stats_histogram_name[i]) {
+ const auto& h = m.histogram();
+ bool valid = true;
+ if (grpc_stats_histo_buckets[i] != h.buckets_size()) valid = false;
+ for (int j = 0; valid && j < h.buckets_size(); j++) {
+ if (grpc_stats_histo_bucket_boundaries[i][j] !=
+ h.buckets(j).start()) {
+ valid = false;
+ }
+ }
+ if (!valid) {
+ gpr_log(GPR_ERROR,
+ "Found histogram %s but shape is different from proto",
+ m.name().c_str());
+ }
+ for (int j = 0; valid && j < h.buckets_size(); j++) {
+ core->histograms[grpc_stats_histo_start[i] + j] =
+ h.buckets(j).count();
+ }
+ }
+ }
+ break;
+ }
+ }
+}
+
+} // namespace grpc
diff --git a/src/cpp/util/core_stats.h b/src/cpp/util/core_stats.h
new file mode 100644
index 0000000000..00e38bf266
--- /dev/null
+++ b/src/cpp/util/core_stats.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H
+#define GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H
+
+#include "src/proto/grpc/core/stats.pb.h"
+
+extern "C" {
+#include "src/core/lib/debug/stats.h"
+}
+
+namespace grpc {
+
+void CoreStatsToProto(const grpc_stats_data& core, grpc::core::Stats* proto);
+void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core);
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H
diff --git a/src/cpp/util/slice_cc.cc b/src/cpp/util/slice_cc.cc
index 486d0cdf0e..3ae17e8052 100644
--- a/src/cpp/util/slice_cc.cc
+++ b/src/cpp/util/slice_cc.cc
@@ -50,4 +50,6 @@ Slice::Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data)
Slice::Slice(void* buf, size_t len, void (*destroy)(void*, size_t))
: slice_(grpc_slice_new_with_len(buf, len, destroy)) {}
+grpc_slice Slice::c_slice() const { return grpc_slice_ref(slice_); }
+
} // namespace grpc