aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-08 01:45:38 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-08 01:45:38 +0200
commit9d72b149a9e3462c2fa13afa27a1e52bfe7bf186 (patch)
treeeedff1af6f56fc97e61c3bee236b109b6d007d69 /src/cpp
parentf75df57a8ffaddb11f064dfa5e54ec8404a81e08 (diff)
parent95a98ca768683f3864b1aefc9d6f266b22705b2a (diff)
Merge branch 'master' of github.com:grpc/grpc into the-ultimate-showdown
Conflicts: include/grpc/grpc.h src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/completion_queue.c src/cpp/client/channel.cc src/cpp/client/insecure_credentials.cc src/csharp/ext/grpc_csharp_ext.c src/node/ext/call.cc src/node/ext/channel.cc src/php/ext/grpc/call.c src/php/ext/grpc/channel.c src/python/grpcio/grpc/_adapter/_c/types/channel.c src/ruby/ext/grpc/rb_channel.c test/core/end2end/dualstack_socket_test.c test/core/end2end/fixtures/chttp2_fullstack.c test/core/end2end/fixtures/chttp2_fullstack_compression.c test/core/end2end/fixtures/chttp2_fullstack_uds_posix.c test/core/end2end/fixtures/chttp2_fullstack_with_poll.c test/core/end2end/multiple_server_queues_test.c test/core/end2end/no_server_test.c test/core/end2end/tests/bad_hostname.c test/core/end2end/tests/cancel_after_accept.c test/core/end2end/tests/cancel_after_accept_and_writes_closed.c test/core/end2end/tests/cancel_after_invoke.c test/core/end2end/tests/cancel_before_invoke.c test/core/end2end/tests/cancel_in_a_vacuum.c test/core/end2end/tests/census_simple_request.c test/core/end2end/tests/disappearing_server.c test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c test/core/end2end/tests/empty_batch.c test/core/end2end/tests/graceful_server_shutdown.c test/core/end2end/tests/invoke_large_request.c test/core/end2end/tests/max_concurrent_streams.c test/core/end2end/tests/max_message_length.c test/core/end2end/tests/ping_pong_streaming.c test/core/end2end/tests/registered_call.c test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c test/core/end2end/tests/request_response_with_metadata_and_payload.c test/core/end2end/tests/request_response_with_payload.c test/core/end2end/tests/request_response_with_payload_and_call_creds.c test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c test/core/end2end/tests/request_with_compressed_payload.c test/core/end2end/tests/request_with_flags.c test/core/end2end/tests/request_with_large_metadata.c test/core/end2end/tests/request_with_payload.c test/core/end2end/tests/server_finishes_request.c test/core/end2end/tests/simple_delayed_request.c test/core/end2end/tests/simple_request.c test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c test/core/fling/client.c test/core/fling/server.c test/core/surface/lame_client_test.c
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel.cc77
-rw-r--r--src/cpp/client/channel.h20
-rw-r--r--src/cpp/client/client_context.cc23
-rw-r--r--src/cpp/client/create_channel.cc2
-rw-r--r--src/cpp/client/insecure_credentials.cc2
-rw-r--r--src/cpp/client/secure_credentials.cc11
-rw-r--r--src/cpp/server/create_default_thread_pool.cc4
-rw-r--r--src/cpp/server/dynamic_thread_pool.cc131
-rw-r--r--src/cpp/server/insecure_server_credentials.cc2
-rw-r--r--src/cpp/server/secure_server_credentials.cc3
-rw-r--r--src/cpp/server/server_context.cc38
-rw-r--r--src/cpp/util/time.cc3
12 files changed, 279 insertions, 37 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 83733fdaae..c8e29bf562 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -39,7 +39,6 @@
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
-#include "src/core/census/grpc_context.h"
#include "src/core/profiling/timers.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
@@ -49,28 +48,34 @@
#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_method.h>
#include <grpc++/status.h>
+#include <grpc++/time.h>
namespace grpc {
-Channel::Channel(const grpc::string& target, grpc_channel* channel)
- : target_(target), c_channel_(channel) {}
+Channel::Channel(grpc_channel* channel) : c_channel_(channel) {}
+
+Channel::Channel(const grpc::string& host, grpc_channel* channel)
+ : host_(host), c_channel_(channel) {}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
- auto c_call =
- method.channel_tag() && context->authority().empty()
- ? grpc_channel_create_registered_call(c_channel_, cq->cq(),
- method.channel_tag(),
- context->raw_deadline(),
- nullptr)
- : grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
- context->authority().empty()
- ? target_.c_str()
- : context->authority().c_str(),
- context->raw_deadline(), nullptr);
- grpc_census_call_set_context(c_call, context->get_census_context());
+ const char* host_str = host_.empty() ? NULL : host_.c_str();
+ auto c_call = method.channel_tag() && context->authority().empty()
+ ? grpc_channel_create_registered_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(),
+ method.channel_tag(), context->raw_deadline(),
+ nullptr)
+ : grpc_channel_create_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(),
+ method.name(), context->authority().empty()
+ ? host_str
+ : context->authority().c_str(),
+ context->raw_deadline(), nullptr);
+ grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());
return Call(c_call, this, cq);
@@ -88,8 +93,48 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
}
void* Channel::RegisterMethod(const char* method) {
- return grpc_channel_register_call(c_channel_, method, target_.c_str(),
+ return grpc_channel_register_call(c_channel_, method,
+ host_.empty() ? NULL : host_.c_str(),
nullptr);
}
+grpc_connectivity_state Channel::GetState(bool try_to_connect) {
+ return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
+}
+
+namespace {
+class TagSaver GRPC_FINAL : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() GRPC_OVERRIDE {}
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+ private:
+ void* tag_;
+};
+
+} // namespace
+
+void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline,
+ CompletionQueue* cq, void* tag) {
+ TagSaver* tag_saver = new TagSaver(tag);
+ grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
+ cq->cq(), tag_saver);
+}
+
+bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) {
+ CompletionQueue cq;
+ bool ok = false;
+ void* tag = NULL;
+ NotifyOnStateChangeImpl(last_observed, deadline, &cq, NULL);
+ cq.Next(&tag, &ok);
+ GPR_ASSERT(tag == NULL);
+ return ok;
+}
+
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 9108713c58..cb8e8d98d2 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -52,17 +52,27 @@ class StreamContextInterface;
class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
- Channel(const grpc::string& target, grpc_channel* c_channel);
+ explicit Channel(grpc_channel* c_channel);
+ Channel(const grpc::string& host, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
- virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
+ void* RegisterMethod(const char* method) GRPC_OVERRIDE;
+ Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops,
+ void PerformOpsOnCall(CallOpSetInterface* ops,
Call* call) GRPC_OVERRIDE;
+ grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
+
private:
- const grpc::string target_;
+ void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline, CompletionQueue* cq,
+ void* tag) GRPC_OVERRIDE;
+
+ bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) GRPC_OVERRIDE;
+
+ const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index d8e6fcbf52..c91ceea6c1 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -34,8 +34,10 @@
#include <grpc++/client_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
+#include <grpc++/server_context.h>
#include <grpc++/time.h>
#include "src/core/channel/compress_filter.h"
@@ -47,7 +49,8 @@ ClientContext::ClientContext()
: initial_metadata_received_(false),
call_(nullptr),
cq_(nullptr),
- deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
+ deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
+ propagate_from_call_(nullptr) {}
ClientContext::~ClientContext() {
if (call_) {
@@ -65,6 +68,14 @@ ClientContext::~ClientContext() {
}
}
+std::unique_ptr<ClientContext> ClientContext::FromServerContext(
+ const ServerContext& context, PropagationOptions options) {
+ std::unique_ptr<ClientContext> ctx(new ClientContext);
+ ctx->propagate_from_call_ = context.call_;
+ ctx->propagation_options_ = options;
+ return ctx;
+}
+
void ClientContext::AddMetadata(const grpc::string& meta_key,
const grpc::string& meta_value) {
send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));
@@ -106,4 +117,14 @@ void ClientContext::TryCancel() {
}
}
+grpc::string ClientContext::peer() const {
+ grpc::string peer;
+ if (call_) {
+ char* c_peer = grpc_call_get_peer(call_);
+ peer = c_peer;
+ gpr_free(c_peer);
+ }
+ return peer;
+}
+
} // namespace grpc
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index 38eeda0dc0..21d01b739d 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -52,6 +52,6 @@ std::shared_ptr<ChannelInterface> CreateChannel(
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
: std::shared_ptr<ChannelInterface>(
- new Channel(target, grpc_lame_client_channel_create()));
+ new Channel(grpc_lame_client_channel_create(NULL)));
}
} // namespace grpc
diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc
index cc37c33c7b..2f9357b568 100644
--- a/src/cpp/client/insecure_credentials.cc
+++ b/src/cpp/client/insecure_credentials.cc
@@ -49,7 +49,7 @@ class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
- target, grpc_channel_create(target.c_str(), &channel_args, nullptr)));
+ grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)));
}
// InsecureCredentials should not be applied to a call.
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 01c7f14f1a..2d6114e06b 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -44,8 +44,7 @@ std::shared_ptr<grpc::ChannelInterface> SecureCredentials::CreateChannel(
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
- args.GetSslTargetNameOverride().empty() ? target
- : args.GetSslTargetNameOverride(),
+ args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
}
@@ -99,8 +98,8 @@ std::shared_ptr<Credentials> ServiceAccountCredentials(
}
// Builds JWT credentials.
-std::shared_ptr<Credentials> JWTCredentials(const grpc::string& json_key,
- long token_lifetime_seconds) {
+std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials(
+ const grpc::string& json_key, long token_lifetime_seconds) {
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create JWTCredentials with non-positive lifetime");
@@ -108,8 +107,8 @@ std::shared_ptr<Credentials> JWTCredentials(const grpc::string& json_key,
}
gpr_timespec lifetime =
gpr_time_from_seconds(token_lifetime_seconds, GPR_TIMESPAN);
- return WrapCredentials(
- grpc_jwt_credentials_create(json_key.c_str(), lifetime));
+ return WrapCredentials(grpc_service_account_jwt_access_credentials_create(
+ json_key.c_str(), lifetime));
}
// Builds refresh token credentials.
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index cc182f59f4..81c84474d8 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -32,7 +32,7 @@
*/
#include <grpc/support/cpu.h>
-#include <grpc++/fixed_size_thread_pool.h>
+#include <grpc++/dynamic_thread_pool.h>
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
@@ -41,7 +41,7 @@ namespace grpc {
ThreadPoolInterface* CreateDefaultThreadPool() {
int cores = gpr_cpu_num_cores();
if (!cores) cores = 4;
- return new FixedSizeThreadPool(cores);
+ return new DynamicThreadPool(cores);
}
} // namespace grpc
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
new file mode 100644
index 0000000000..f58d0420df
--- /dev/null
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -0,0 +1,131 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc++/dynamic_thread_pool.h>
+
+namespace grpc {
+DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool):
+ pool_(pool),
+ thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)) {
+}
+DynamicThreadPool::DynamicThread::~DynamicThread() {
+ thd_->join();
+ thd_.reset();
+}
+
+void DynamicThreadPool::DynamicThread::ThreadFunc() {
+ pool_->ThreadFunc();
+ // Now that we have killed ourselves, we should reduce the thread count
+ grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
+ pool_->nthreads_--;
+ // Move ourselves to dead list
+ pool_->dead_threads_.push_back(this);
+
+ if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
+ pool_->shutdown_cv_.notify_one();
+ }
+}
+
+void DynamicThreadPool::ThreadFunc() {
+ for (;;) {
+ // Wait until work is available or we are shutting down.
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_ && callbacks_.empty()) {
+ // If there are too many threads waiting, then quit this thread
+ if (threads_waiting_ >= reserve_threads_) {
+ break;
+ }
+ threads_waiting_++;
+ cv_.wait(lock);
+ threads_waiting_--;
+ }
+ // Drain callbacks before considering shutdown to ensure all work
+ // gets completed.
+ if (!callbacks_.empty()) {
+ auto cb = callbacks_.front();
+ callbacks_.pop();
+ lock.unlock();
+ cb();
+ } else if (shutdown_) {
+ break;
+ }
+ }
+}
+
+DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
+ shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0),
+ threads_waiting_(0) {
+ for (int i = 0; i < reserve_threads_; i++) {
+ grpc::lock_guard<grpc::mutex> lock(mu_);
+ nthreads_++;
+ new DynamicThread(this);
+ }
+}
+
+void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
+ for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
+ delete *t;
+ }
+}
+
+DynamicThreadPool::~DynamicThreadPool() {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ shutdown_ = true;
+ cv_.notify_all();
+ while (nthreads_ != 0) {
+ shutdown_cv_.wait(lock);
+ }
+ ReapThreads(&dead_threads_);
+}
+
+void DynamicThreadPool::Add(const std::function<void()>& callback) {
+ grpc::lock_guard<grpc::mutex> lock(mu_);
+ // Add works to the callbacks list
+ callbacks_.push(callback);
+ // Increase pool size or notify as needed
+ if (threads_waiting_ == 0) {
+ // Kick off a new thread
+ nthreads_++;
+ new DynamicThread(this);
+ } else {
+ cv_.notify_one();
+ }
+ // Also use this chance to harvest dead threads
+ if (!dead_threads_.empty()) {
+ ReapThreads(&dead_threads_);
+ }
+}
+
+} // namespace grpc
diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc
index aca3568e59..800cd36caa 100644
--- a/src/cpp/server/insecure_server_credentials.cc
+++ b/src/cpp/server/insecure_server_credentials.cc
@@ -41,7 +41,7 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
public:
int AddPortToServer(const grpc::string& addr,
grpc_server* server) GRPC_OVERRIDE {
- return grpc_server_add_http2_port(server, addr.c_str());
+ return grpc_server_add_insecure_http2_port(server, addr.c_str());
}
};
} // namespace
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 3e262dd74f..32c45e2280 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -51,7 +51,8 @@ std::shared_ptr<ServerCredentials> SslServerCredentials(
}
grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
- &pem_key_cert_pairs[0], pem_key_cert_pairs.size());
+ &pem_key_cert_pairs[0], pem_key_cert_pairs.size(),
+ options.force_client_auth);
return std::shared_ptr<ServerCredentials>(
new SecureServerCredentials(c_creds));
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index a178772548..bb34040a2f 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -34,6 +34,7 @@
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/sync.h>
@@ -49,16 +50,23 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+ CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
+ void set_tag(void* tag) {
+ has_tag_ = true;
+ tag_ = tag;
+ }
+
void Unref();
private:
+ bool has_tag_;
+ void* tag_;
grpc::mutex mu_;
int refs_;
bool finalized_;
@@ -90,18 +98,25 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
+ bool ret = false;
+ if (has_tag_) {
+ *tag = tag_;
+ ret = true;
+ }
if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
}
- return false;
+ return ret;
}
// ServerContext body
ServerContext::ServerContext()
: completion_op_(nullptr),
+ has_notify_when_done_tag_(false),
+ async_notify_when_done_tag_(nullptr),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {}
@@ -109,6 +124,8 @@ ServerContext::ServerContext()
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count)
: completion_op_(nullptr),
+ has_notify_when_done_tag_(false),
+ async_notify_when_done_tag_(nullptr),
deadline_(deadline),
call_(nullptr),
cq_(nullptr),
@@ -133,6 +150,9 @@ ServerContext::~ServerContext() {
void ServerContext::BeginCompletionOp(Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
+ if (has_notify_when_done_tag_) {
+ completion_op_->set_tag(async_notify_when_done_tag_);
+ }
call->PerformOps(completion_op_);
}
@@ -180,4 +200,18 @@ std::shared_ptr<const AuthContext> ServerContext::auth_context() const {
return auth_context_;
}
+grpc::string ServerContext::peer() const {
+ grpc::string peer;
+ if (call_) {
+ char* c_peer = grpc_call_get_peer(call_);
+ peer = c_peer;
+ gpr_free(c_peer);
+ }
+ return peer;
+}
+
+const struct census_context* ServerContext::census_context() const {
+ return grpc_census_call_get_context(call_);
+}
+
} // namespace grpc
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index a814cad452..799c597e0b 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -79,9 +79,10 @@ void TimepointHR2Timespec(const high_resolution_clock::time_point& from,
}
system_clock::time_point Timespec2Timepoint(gpr_timespec t) {
- if (gpr_time_cmp(t, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
+ if (gpr_time_cmp(t, gpr_inf_future(t.clock_type)) == 0) {
return system_clock::time_point::max();
}
+ t = gpr_convert_clock_type(t, GPR_CLOCK_REALTIME);
system_clock::time_point tp;
tp += duration_cast<system_clock::time_point::duration>(seconds(t.tv_sec));
tp +=