diff options
author | Muxi Yan <mxyan@google.com> | 2017-09-22 15:24:44 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-09-22 15:24:44 -0700 |
commit | f7d8860f4ff688e966159f31e2c281a22e2aa3f4 (patch) | |
tree | 9367583b04c0e9464c6f091d699dfc64ba63a7f8 /src/cpp | |
parent | 76e0c1ddd536342bdc5059209da38d5406c7e717 (diff) | |
parent | 31c66c576ad00504b34182340f8ff21bc3f447fb (diff) |
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-eos
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 202 | ||||
-rw-r--r-- | src/cpp/client/client_context.cc | 2 | ||||
-rw-r--r-- | src/cpp/client/generic_stub.cc | 33 | ||||
-rw-r--r-- | src/cpp/common/channel_filter.cc | 6 | ||||
-rw-r--r-- | src/cpp/common/channel_filter.h | 22 | ||||
-rw-r--r-- | src/cpp/common/core_codegen.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 1 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 9 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 2 | ||||
-rw-r--r-- | src/cpp/util/byte_buffer_cc.cc | 31 | ||||
-rw-r--r-- | src/cpp/util/core_stats.cc | 90 | ||||
-rw-r--r-- | src/cpp/util/core_stats.h | 35 | ||||
-rw-r--r-- | src/cpp/util/slice_cc.cc | 2 |
13 files changed, 386 insertions, 53 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index f2d9bb07c9..19a25c838f 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -18,7 +18,10 @@ #include <grpc++/channel.h> +#include <chrono> +#include <condition_variable> #include <memory> +#include <mutex> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -35,17 +38,197 @@ #include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" namespace grpc { +namespace { +int kConnectivityCheckIntervalMsec = 500; +void WatchStateChange(void* arg); + +class TagSaver final : public CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + +// Constantly watches channel connectivity status to reconnect a transiently +// disconnected channel. This is a temporary work-around before we have retry +// support. +class ChannelConnectivityWatcher : private GrpcLibraryCodegen { + public: + static void StartWatching(grpc_channel* channel) { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_ == nullptr) { + g_watcher_ = new ChannelConnectivityWatcher(); + } + g_watcher_->StartWatchingLocked(channel); + } + } + + static void StopWatching() { + if (!IsDisabled()) { + std::unique_lock<std::mutex> lock(g_watcher_mu_); + if (g_watcher_->StopWatchingLocked()) { + delete g_watcher_; + g_watcher_ = nullptr; + } + } + } + + private: + ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { + gpr_ref_init(&ref_, 0); + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); + } + + static bool IsDisabled() { + char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); + bool disabled = gpr_is_true(env); + gpr_free(env); + return disabled; + } + + void WatchStateChangeImpl() { + bool ok = false; + void* tag = NULL; + CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; + while (true) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + // Drain cq_ if the watcher is shutting down + status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); + } else { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + // Make sure we've seen 2 TIMEOUTs before going to sleep + if (status == CompletionQueue::TIMEOUT) { + status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); + if (status == CompletionQueue::TIMEOUT) { + shutdown_cv_.wait_for(lock, std::chrono::milliseconds( + kConnectivityCheckIntervalMsec)); + continue; + } + } + } + } + ChannelState* channel_state = static_cast<ChannelState*>(tag); + channel_state->state = + grpc_channel_check_connectivity_state(channel_state->channel, false); + if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { + void* shutdown_tag = NULL; + channel_state->shutdown_cq.Next(&shutdown_tag, &ok); + delete channel_state; + if (gpr_unref(&ref_)) { + break; + } + } else { + TagSaver* tag_saver = new TagSaver(channel_state); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); + } + } + } + + void StartWatchingLocked(grpc_channel* channel) { + if (thd_id_ != 0) { + gpr_ref(&ref_); + ++channel_count_; + ChannelState* channel_state = new ChannelState(channel); + // The first grpc_channel_watch_connectivity_state() is not used to + // monitor the channel state change, but to hold a reference of the + // c channel. So that WatchStateChangeImpl() can observe state == + // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed. + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), + new TagSaver(nullptr)); + grpc_channel_watch_connectivity_state( + channel_state->channel, channel_state->state, + gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), + new TagSaver(channel_state)); + } + } + + bool StopWatchingLocked() { + if (--channel_count_ == 0) { + { + std::unique_lock<std::mutex> lock(shutdown_mu_); + shutdown_ = true; + shutdown_cv_.notify_one(); + } + gpr_thd_join(thd_id_); + return true; + } + return false; + } + + friend void WatchStateChange(void* arg); + struct ChannelState { + explicit ChannelState(grpc_channel* channel) + : channel(channel), state(GRPC_CHANNEL_IDLE){}; + grpc_channel* channel; + grpc_connectivity_state state; + CompletionQueue shutdown_cq; + }; + gpr_thd_id thd_id_; + CompletionQueue cq_; + gpr_refcount ref_; + int channel_count_; + + std::mutex shutdown_mu_; + std::condition_variable shutdown_cv_; // protected by shutdown_mu_ + bool shutdown_; // protected by shutdown_mu_ + + static std::mutex g_watcher_mu_; + static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ +}; + +std::mutex ChannelConnectivityWatcher::g_watcher_mu_; +ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; + +void WatchStateChange(void* arg) { + ChannelConnectivityWatcher* watcher = + static_cast<ChannelConnectivityWatcher*>(arg); + watcher->WatchStateChangeImpl(); +} +} // namespace + static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); + if (grpc_channel_support_connectivity_watcher(channel)) { + ChannelConnectivityWatcher::StartWatching(channel); + } } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + const bool stop_watching = + grpc_channel_support_connectivity_watcher(c_channel_); + grpc_channel_destroy(c_channel_); + if (stop_watching) { + ChannelConnectivityWatcher::StopWatching(); + } +} namespace { @@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } -namespace { -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -} // namespace - void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 3af8bdc11a..40e95f3c05 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -96,7 +96,7 @@ void ClientContext::set_call(grpc_call* call, void ClientContext::set_compression_algorithm( grpc_compression_algorithm algorithm) { - char* algorithm_name = nullptr; + const char* algorithm_name = nullptr; if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", algorithm); diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..693b8bea56 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -22,14 +22,39 @@ namespace grpc { +namespace { +std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal( + ChannelInterface* channel, ClientContext* context, + const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { + return std::unique_ptr<GenericClientAsyncReaderWriter>( + GenericClientAsyncReaderWriter::Create( + channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), + context, start, tag)); +} + +} // namespace + // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { - return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( - channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + return CallInternal(channel_.get(), context, method, cq, true, tag); +} + +// setup a call to a named method +std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::PrepareCall( + ClientContext* context, const grpc::string& method, CompletionQueue* cq) { + return CallInternal(channel_.get(), context, method, cq, false, nullptr); +} + +// setup a unary call to a named method +std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall( + ClientContext* context, const grpc::string& method, + const ByteBuffer& request, CompletionQueue* cq) { + return std::unique_ptr<GenericClientAsyncResponseReader>( + GenericClientAsyncResponseReader::Create( + channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC), + context, request, false)); } } // namespace grpc diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index f870af0c67..ea44cff832 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -18,7 +18,9 @@ #include <string.h> +extern "C" { #include "src/core/lib/channel/channel_stack.h" +} #include "src/cpp/common/channel_filter.h" #include <grpc++/impl/codegen/slice.h> @@ -66,10 +68,6 @@ void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent); } -char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return grpc_call_next_get_peer(exec_ctx, elem); -} - // internal code used by RegisterChannelFilter() namespace internal { diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 5d629f7c14..c1aeb3f724 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -26,9 +26,11 @@ #include <functional> #include <vector> +extern "C" { #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata_batch.h" +} /// An interface to define filters. /// @@ -193,6 +195,15 @@ class TransportStreamOpBatch { op_->payload->send_message.send_message = send_message; } + grpc_byte_stream **recv_message() const { + return op_->recv_message ? op_->payload->recv_message.recv_message + : nullptr; + } + void set_recv_message(grpc_byte_stream **recv_message) { + op_->recv_message = true; + op_->payload->recv_message.recv_message = recv_message; + } + census_context *get_census_context() const { return (census_context *)op_->payload->context[GRPC_CONTEXT_TRACING].value; } @@ -257,9 +268,6 @@ class CallData { virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_polling_entity *pollent); - - /// Gets the peer name. - virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); }; namespace internal { @@ -338,11 +346,6 @@ class ChannelFilter final { CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data); call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); } - - static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - CallDataType *call_data = reinterpret_cast<CallDataType *>(elem->call_data); - return call_data->GetPeer(exec_ctx, elem); - } }; struct FilterRecord { @@ -385,8 +388,7 @@ void RegisterChannelFilter( FilterType::call_data_size, FilterType::InitCallElement, FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, FilterType::channel_data_size, FilterType::InitChannelElement, - FilterType::DestroyChannelElement, FilterType::GetPeer, - FilterType::GetChannelInfo, name}}; + FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}}; internal::channel_filters->push_back(filter_record); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index c7c6b6b13b..6ea5f1d3c7 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); } void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); } +grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) { + return ::grpc_byte_buffer_copy(bb); +} + void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 815b607032..d2cba6d662 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -20,6 +20,7 @@ #include <mutex> #include <grpc++/impl/codegen/method_handler_impl.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 2483300cb1..6bd3ecda32 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -17,6 +17,7 @@ #include <grpc++/server.h> +#include <cstdlib> #include <sstream> #include <utility> @@ -38,6 +39,7 @@ #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/call.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -607,7 +609,12 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { grpc_op cops[MAX_OPS]; ops->FillOps(call->call(), cops, &nops); auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); - GPR_ASSERT(GRPC_CALL_OK == result); + if (result != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); + grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, + call->call(), cops, nops, ops); + abort(); + } } ServerInterface::BaseAsyncRequest::BaseAsyncRequest( diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 4913682f1d..d7876a000b 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -190,7 +190,7 @@ bool ServerContext::IsCancelled() const { void ServerContext::set_compression_algorithm( grpc_compression_algorithm algorithm) { - char* algorithm_name = NULL; + const char* algorithm_name = NULL; if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", algorithm); diff --git a/src/cpp/util/byte_buffer_cc.cc b/src/cpp/util/byte_buffer_cc.cc index b1ff25252a..180c813762 100644 --- a/src/cpp/util/byte_buffer_cc.cc +++ b/src/cpp/util/byte_buffer_cc.cc @@ -16,11 +16,15 @@ * */ +#include <grpc++/impl/grpc_library.h> #include <grpc++/support/byte_buffer.h> +#include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> namespace grpc { +static internal::GrpcLibraryInitializer g_gli_initializer; + ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { // The following assertions check that the representation of a grpc::Slice is // identical to that of a grpc_slice: it has a grpc_slice field, and nothing @@ -29,6 +33,16 @@ ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { "Slice must have same representation as grpc_slice"); static_assert(sizeof(Slice) == sizeof(grpc_slice), "Slice must have same representation as grpc_slice"); + // The following assertions check that the representation of a ByteBuffer is + // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, + // and nothing else. + static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + g_gli_initializer.summon(); // Make sure that initializer linked in // The const_cast is legal if grpc_raw_byte_buffer_create() does no more // than its advertised side effect of increasing the reference count of the // slices it processes, and such an increase does not affect the semantics @@ -37,19 +51,6 @@ ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); } -ByteBuffer::~ByteBuffer() { - if (buffer_) { - grpc_byte_buffer_destroy(buffer_); - } -} - -void ByteBuffer::Clear() { - if (buffer_) { - grpc_byte_buffer_destroy(buffer_); - buffer_ = nullptr; - } -} - Status ByteBuffer::Dump(std::vector<Slice>* slices) const { slices->clear(); if (!buffer_) { @@ -80,7 +81,9 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {} ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { - Clear(); // first remove existing data + if (this != &buf) { + Clear(); // first remove existing data + } if (buf.buffer_) { buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy } diff --git a/src/cpp/util/core_stats.cc b/src/cpp/util/core_stats.cc new file mode 100644 index 0000000000..edf0b1bb67 --- /dev/null +++ b/src/cpp/util/core_stats.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/cpp/util/core_stats.h" + +#include <grpc/support/log.h> + +using grpc::core::Bucket; +using grpc::core::Histogram; +using grpc::core::Metric; +using grpc::core::Stats; + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, Stats* proto) { + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_counter_name[i]); + m->set_count(core.counters[i]); + } + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + Metric* m = proto->add_metrics(); + m->set_name(grpc_stats_histogram_name[i]); + Histogram* h = m->mutable_histogram(); + for (int j = 0; j < grpc_stats_histo_buckets[i]; j++) { + Bucket* b = h->add_buckets(); + b->set_start(grpc_stats_histo_bucket_boundaries[i][j]); + b->set_count(core.histograms[grpc_stats_histo_start[i] + j]); + } + } +} + +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core) { + memset(core, 0, sizeof(*core)); + for (const auto& m : proto.metrics()) { + switch (m.value_case()) { + case Metric::VALUE_NOT_SET: + break; + case Metric::kCount: + for (int i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) { + if (m.name() == grpc_stats_counter_name[i]) { + core->counters[i] = m.count(); + break; + } + } + break; + case Metric::kHistogram: + for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) { + if (m.name() == grpc_stats_histogram_name[i]) { + const auto& h = m.histogram(); + bool valid = true; + if (grpc_stats_histo_buckets[i] != h.buckets_size()) valid = false; + for (int j = 0; valid && j < h.buckets_size(); j++) { + if (grpc_stats_histo_bucket_boundaries[i][j] != + h.buckets(j).start()) { + valid = false; + } + } + if (!valid) { + gpr_log(GPR_ERROR, + "Found histogram %s but shape is different from proto", + m.name().c_str()); + } + for (int j = 0; valid && j < h.buckets_size(); j++) { + core->histograms[grpc_stats_histo_start[i] + j] = + h.buckets(j).count(); + } + } + } + break; + } + } +} + +} // namespace grpc diff --git a/src/cpp/util/core_stats.h b/src/cpp/util/core_stats.h new file mode 100644 index 0000000000..00e38bf266 --- /dev/null +++ b/src/cpp/util/core_stats.h @@ -0,0 +1,35 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H +#define GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H + +#include "src/proto/grpc/core/stats.pb.h" + +extern "C" { +#include "src/core/lib/debug/stats.h" +} + +namespace grpc { + +void CoreStatsToProto(const grpc_stats_data& core, grpc::core::Stats* proto); +void ProtoToCoreStats(const grpc::core::Stats& proto, grpc_stats_data* core); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_UTIL_CORE_STATS_H diff --git a/src/cpp/util/slice_cc.cc b/src/cpp/util/slice_cc.cc index 486d0cdf0e..3ae17e8052 100644 --- a/src/cpp/util/slice_cc.cc +++ b/src/cpp/util/slice_cc.cc @@ -50,4 +50,6 @@ Slice::Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data) Slice::Slice(void* buf, size_t len, void (*destroy)(void*, size_t)) : slice_(grpc_slice_new_with_len(buf, len, destroy)) {} +grpc_slice Slice::c_slice() const { return grpc_slice_ref(slice_); } + } // namespace grpc |