aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel.cc12
-rw-r--r--src/cpp/client/channel.h10
-rw-r--r--src/cpp/client/channel_arguments.cc15
-rw-r--r--src/cpp/client/secure_channel_arguments.cc (renamed from src/cpp/proto/proto_utils.h)29
-rw-r--r--src/cpp/common/call.cc299
-rw-r--r--src/cpp/proto/proto_utils.cc45
-rw-r--r--src/cpp/server/create_default_thread_pool.cc (renamed from src/cpp/client/client_unary_call.cc)35
-rw-r--r--src/cpp/server/insecure_server_credentials.cc3
-rw-r--r--src/cpp/server/server.cc256
-rw-r--r--src/cpp/server/server_builder.cc4
-rw-r--r--src/cpp/server/server_context.cc22
-rw-r--r--src/cpp/server/server_credentials.cc2
-rw-r--r--src/cpp/server/thread_pool.h2
-rw-r--r--src/cpp/util/byte_buffer.cc15
-rw-r--r--src/cpp/util/status.cc2
-rw-r--r--src/cpp/util/time.cc15
16 files changed, 215 insertions, 551 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 475a20d883..72593f877e 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -36,12 +36,10 @@
#include <memory>
#include <grpc/grpc.h>
-#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -75,14 +73,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
- buf->FillOps(ops, &nops);
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd239247c8..9108713c58 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -44,22 +44,22 @@ struct grpc_channel;
namespace grpc {
class Call;
-class CallOpBuffer;
+class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
-class Channel GRPC_FINAL : public GrpcLibrary,
- public ChannelInterface {
+class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
+ virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops,
+ Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc
index 87f8349eef..b271650673 100644
--- a/src/cpp/client/channel_arguments.cc
+++ b/src/cpp/client/channel_arguments.cc
@@ -33,21 +33,12 @@
#include <grpc++/channel_arguments.h>
-#include <grpc/grpc_security.h>
+#include "src/core/channel/channel_args.h"
namespace grpc {
-void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) {
- SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name);
-}
-
-grpc::string ChannelArguments::GetSslTargetNameOverride() const {
- for (unsigned int i = 0; i < args_.size(); i++) {
- if (grpc::string(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG) == args_[i].key) {
- return args_[i].value.string;
- }
- }
- return "";
+void ChannelArguments::SetCompressionLevel(grpc_compression_level level) {
+ SetInt(GRPC_COMPRESSION_LEVEL_ARG, level);
}
void ChannelArguments::SetInt(const grpc::string& key, int value) {
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/client/secure_channel_arguments.cc
index 67a775b3ca..d89df999ad 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/src/cpp/client/secure_channel_arguments.cc
@@ -31,25 +31,24 @@
*
*/
-#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+#include <grpc++/channel_arguments.h>
+#include <grpc/grpc_security.h>
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
+#include "src/core/channel/channel_args.h"
namespace grpc {
-// Serialize the msg into a buffer created inside the function. The caller
-// should destroy the returned buffer when done with it. If serialization fails,
-// false is returned and buffer is left unchanged.
-bool SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer);
+void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) {
+ SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name);
+}
-// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size);
+grpc::string ChannelArguments::GetSslTargetNameOverride() const {
+ for (unsigned int i = 0; i < args_.size(); i++) {
+ if (grpc::string(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG) == args_[i].key) {
+ return args_[i].value.string;
+ }
+ }
+ return "";
+}
} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 1068111e3f..0a5c976e01 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -39,107 +39,32 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
-CallOpBuffer::CallOpBuffer()
- : return_tag_(this),
- send_initial_metadata_(false),
- initial_metadata_count_(0),
- initial_metadata_(nullptr),
- recv_initial_metadata_(nullptr),
- send_message_(nullptr),
- send_message_buffer_(nullptr),
- send_buf_(nullptr),
- recv_message_(nullptr),
- recv_message_buffer_(nullptr),
- recv_buf_(nullptr),
- max_message_size_(-1),
- client_send_close_(false),
- recv_trailing_metadata_(nullptr),
- recv_status_(nullptr),
- status_code_(GRPC_STATUS_OK),
- status_details_(nullptr),
- status_details_capacity_(0),
- send_status_available_(false),
- send_status_code_(GRPC_STATUS_OK),
- trailing_metadata_count_(0),
- trailing_metadata_(nullptr),
- cancelled_buf_(0),
- recv_closed_(nullptr) {
- memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
- memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
- recv_trailing_metadata_arr_.metadata = nullptr;
- recv_initial_metadata_arr_.metadata = nullptr;
-}
-
-void CallOpBuffer::Reset(void* next_return_tag) {
- return_tag_ = next_return_tag;
-
- send_initial_metadata_ = false;
- initial_metadata_count_ = 0;
- gpr_free(initial_metadata_);
-
- recv_initial_metadata_ = nullptr;
- recv_initial_metadata_arr_.count = 0;
-
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_message_ = nullptr;
- send_message_buffer_ = nullptr;
- send_buf_ = nullptr;
-
- got_message = false;
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- recv_message_ = nullptr;
- recv_message_buffer_ = nullptr;
- recv_buf_ = nullptr;
-
- client_send_close_ = false;
-
- recv_trailing_metadata_ = nullptr;
- recv_status_ = nullptr;
- recv_trailing_metadata_arr_.count = 0;
-
- status_code_ = GRPC_STATUS_OK;
-
- send_status_available_ = false;
- send_status_code_ = GRPC_STATUS_OK;
- send_status_details_.clear();
- trailing_metadata_count_ = 0;
- trailing_metadata_ = nullptr;
-
- recv_closed_ = nullptr;
-}
-
-CallOpBuffer::~CallOpBuffer() {
- gpr_free(status_details_);
- gpr_free(recv_initial_metadata_arr_.metadata);
- gpr_free(recv_trailing_metadata_arr_.metadata);
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata) {
+ for (size_t i = 0; i < arr->count; i++) {
+ // TODO(yangg) handle duplicates?
+ metadata->insert(std::pair<grpc::string, grpc::string>(
+ arr->metadata[i].key,
+ grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
+ grpc_metadata_array_destroy(arr);
+ grpc_metadata_array_init(arr);
}
-namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadataArray(
- std::multimap<grpc::string, grpc::string>* metadata) {
- if (metadata->empty()) {
+ const std::multimap<grpc::string, grpc::string>& metadata) {
+ if (metadata.empty()) {
return nullptr;
}
grpc_metadata* metadata_array =
- (grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata));
+ (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
size_t i = 0;
- for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) {
+ for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
metadata_array[i].key = iter->first.c_str();
metadata_array[i].value = iter->second.c_str();
metadata_array[i].value_length = iter->second.size();
@@ -147,198 +72,6 @@ grpc_metadata* FillMetadataArray(
return metadata_array;
}
-void FillMetadataMap(grpc_metadata_array* arr,
- std::multimap<grpc::string, grpc::string>* metadata) {
- for (size_t i = 0; i < arr->count; i++) {
- // TODO(yangg) handle duplicates?
- metadata->insert(std::pair<grpc::string, grpc::string>(
- arr->metadata[i].key,
- grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
- }
- grpc_metadata_array_destroy(arr);
- grpc_metadata_array_init(arr);
-}
-} // namespace
-
-void CallOpBuffer::AddSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- send_initial_metadata_ = true;
- initial_metadata_count_ = metadata->size();
- initial_metadata_ = FillMetadataArray(metadata);
-}
-
-void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
- ctx->initial_metadata_received_ = true;
- recv_initial_metadata_ = &ctx->recv_initial_metadata_;
-}
-
-void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
- AddSendInitialMetadata(&ctx->send_initial_metadata_);
-}
-
-void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
- send_message_ = &message;
-}
-
-void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
- send_message_buffer_ = &message;
-}
-
-void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
- recv_message_ = message;
- recv_message_->Clear();
-}
-
-void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
- recv_message_buffer_ = message;
- recv_message_buffer_->Clear();
-}
-
-void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
-
-void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
- recv_closed_ = cancelled;
-}
-
-void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
- recv_trailing_metadata_ = &context->trailing_metadata_;
- recv_status_ = status;
-}
-
-void CallOpBuffer::AddServerSendStatus(
- std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
- if (metadata != NULL) {
- trailing_metadata_count_ = metadata->size();
- trailing_metadata_ = FillMetadataArray(metadata);
- } else {
- trailing_metadata_count_ = 0;
- }
- send_status_available_ = true;
- send_status_code_ = static_cast<grpc_status_code>(status.code());
- send_status_details_ = status.details();
-}
-
-void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
- *nops = 0;
- if (send_initial_metadata_) {
- ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
- ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
- (*nops)++;
- }
- if (recv_initial_metadata_) {
- ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
- (*nops)++;
- }
- if (send_message_ || send_message_buffer_) {
- if (send_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0);
- bool success = SerializeProto(*send_message_, &send_buf_);
- if (!success) {
- abort();
- // TODO handle parse failure
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0);
- } else {
- send_buf_ = send_message_buffer_->buffer();
- }
- ops[*nops].op = GRPC_OP_SEND_MESSAGE;
- ops[*nops].data.send_message = send_buf_;
- (*nops)++;
- }
- if (recv_message_ || recv_message_buffer_) {
- ops[*nops].op = GRPC_OP_RECV_MESSAGE;
- ops[*nops].data.recv_message = &recv_buf_;
- (*nops)++;
- }
- if (client_send_close_) {
- ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- (*nops)++;
- }
- if (recv_status_) {
- ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[*nops].data.recv_status_on_client.trailing_metadata =
- &recv_trailing_metadata_arr_;
- ops[*nops].data.recv_status_on_client.status = &status_code_;
- ops[*nops].data.recv_status_on_client.status_details = &status_details_;
- ops[*nops].data.recv_status_on_client.status_details_capacity =
- &status_details_capacity_;
- (*nops)++;
- }
- if (send_status_available_) {
- ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- ops[*nops].data.send_status_from_server.trailing_metadata_count =
- trailing_metadata_count_;
- ops[*nops].data.send_status_from_server.trailing_metadata =
- trailing_metadata_;
- ops[*nops].data.send_status_from_server.status = send_status_code_;
- ops[*nops].data.send_status_from_server.status_details =
- send_status_details_.empty() ? nullptr : send_status_details_.c_str();
- (*nops)++;
- }
- if (recv_closed_) {
- ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_;
- (*nops)++;
- }
-}
-
-bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
- // Release send buffers.
- if (send_buf_ && send_message_) {
- if (send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_buf_ = nullptr;
- }
- if (initial_metadata_) {
- gpr_free(initial_metadata_);
- initial_metadata_ = nullptr;
- }
- if (trailing_metadata_count_) {
- gpr_free(trailing_metadata_);
- trailing_metadata_ = nullptr;
- }
- // Set user-facing tag.
- *tag = return_tag_;
- // Process received initial metadata
- if (recv_initial_metadata_) {
- FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
- }
- // Parse received message if any.
- if (recv_message_ || recv_message_buffer_) {
- if (recv_buf_) {
- got_message = *status;
- if (recv_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- *status = *status &&
- DeserializeProto(recv_buf_, recv_message_, max_message_size_);
- grpc_byte_buffer_destroy(recv_buf_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- } else {
- recv_message_buffer_->set_buffer(recv_buf_);
- }
- recv_buf_ = nullptr;
- } else {
- // Read failed
- got_message = false;
- *status = false;
- }
- }
- // Parse received status.
- if (recv_status_) {
- FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
- *recv_status_ = Status(
- static_cast<StatusCode>(status_code_),
- status_details_ ? grpc::string(status_details_) : grpc::string());
- }
- if (recv_closed_) {
- *recv_closed_ = cancelled_buf_ != 0;
- }
- return true;
-}
-
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@@ -349,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
-void Call::PerformOps(CallOpBuffer* buffer) {
+void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
- buffer->set_max_message_size(max_message_size_);
+ ops->set_max_message_size(max_message_size_);
}
- call_hook_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index b9554c4bb7..7b2a65e99b 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -31,11 +31,12 @@
*
*/
-#include "src/cpp/proto/proto_utils.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/port_platform.h>
@@ -48,8 +49,8 @@ class GrpcBufferWriter GRPC_FINAL
explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
- *bp = grpc_byte_buffer_create(NULL, 0);
- slice_buffer_ = &(*bp)->data.slice_buffer;
+ *bp = grpc_raw_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
@@ -66,7 +67,7 @@ class GrpcBufferWriter GRPC_FINAL
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
@@ -100,11 +101,9 @@ class GrpcBufferReader GRPC_FINAL
public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
- reader_ = grpc_byte_buffer_reader_create(buffer);
- }
- ~GrpcBufferReader() GRPC_OVERRIDE {
- grpc_byte_buffer_reader_destroy(reader_);
+ grpc_byte_buffer_reader_init(&reader_, buffer);
}
+ ~GrpcBufferReader() GRPC_OVERRIDE {}
bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
@@ -114,12 +113,12 @@ class GrpcBufferReader GRPC_FINAL
backup_count_ = 0;
return true;
}
- if (!grpc_byte_buffer_reader_next(reader_, &slice_)) {
+ if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
return false;
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
return true;
}
@@ -147,26 +146,38 @@ class GrpcBufferReader GRPC_FINAL
private:
gpr_int64 byte_count_;
gpr_int64 backup_count_;
- grpc_byte_buffer_reader* reader_;
+ grpc_byte_buffer_reader reader_;
gpr_slice slice_;
};
namespace grpc {
-bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
+Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
- return msg.SerializeToZeroCopyStream(&writer);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? Status::OK
+ : Status(StatusCode::INVALID_ARGUMENT,
+ "Failed to serialize message");
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size) {
- if (!buffer) return false;
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size) {
+ if (!buffer) {
+ return Status(StatusCode::INVALID_ARGUMENT, "No payload");
+ }
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
- return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ return Status(StatusCode::INVALID_ARGUMENT,
+ msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ return Status(StatusCode::INVALID_ARGUMENT, "Did not read entire message");
+ }
+ return Status::OK;
}
} // namespace grpc
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/server/create_default_thread_pool.cc
index 7e7ea78bcd..89c1d7e929 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -31,34 +31,19 @@
*
*/
-#include <grpc++/impl/client_unary_call.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <grpc/support/log.h>
+#include <grpc/support/cpu.h>
+#include "src/cpp/server/thread_pool.h"
+
+#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
namespace grpc {
-// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result) {
- CompletionQueue cq;
- Call call(channel->CreateCall(method, context, &cq));
- CallOpBuffer buf;
- Status status;
- buf.AddSendInitialMetadata(context);
- buf.AddSendMessage(request);
- buf.AddRecvInitialMetadata(context);
- buf.AddRecvMessage(result);
- buf.AddClientSendClose();
- buf.AddClientRecvStatus(context, &status);
- call.PerformOps(&buf);
- GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
- return status;
+ThreadPoolInterface* CreateDefaultThreadPool() {
+ int cores = gpr_cpu_num_cores();
+ if (!cores) cores = 4;
+ return new ThreadPool(cores);
}
} // namespace grpc
+
+#endif // !GRPC_CUSTOM_DEFAULT_THREAD_POOL
diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc
index 55dd90d7a7..aca3568e59 100644
--- a/src/cpp/server/insecure_server_credentials.cc
+++ b/src/cpp/server/insecure_server_credentials.cc
@@ -31,9 +31,10 @@
*
*/
-#include <grpc/grpc_security.h>
#include <grpc++/server_credentials.h>
+#include <grpc/grpc.h>
+
namespace grpc {
namespace {
class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index dbd88c5b8c..1437b2dea7 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -35,7 +35,6 @@
#include <utility>
#include <grpc/grpc.h>
-#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
@@ -48,7 +47,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -69,15 +67,11 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
- has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::CLIENT_STREAMING) {
+ cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() {
- grpc_metadata_array_destroy(&request_metadata_);
- }
+ ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -90,10 +84,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+
+ void TeardownRequest() {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
- GPR_ASSERT(!in_flight_);
+ GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
@@ -116,7 +116,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
- has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@@ -133,35 +132,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
- std::unique_ptr<grpc::protobuf::Message> req;
- std::unique_ptr<grpc::protobuf::Message> res;
- if (has_request_payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get(),
- call_.max_message_size())) {
- // FIXME(yangg) deal with deserialization failure
- cq_.Shutdown();
- return;
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- }
- if (has_response_payload_) {
- res.reset(method_->AllocateResponseProto());
- }
ctx_.BeginCompletionOp(&call_);
- auto status = method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
- CallOpBuffer buf;
- if (!ctx_.sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
- }
- if (has_response_payload_) {
- buf.AddSendMessage(*res);
- }
- buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); /* status ignored */
+ method_->handler()->RunHandler(MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_, call_.max_message_size()));
+ request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@@ -173,7 +147,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@@ -183,7 +156,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@@ -251,9 +223,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
- GPR_ASSERT(service->dispatch_impl_ == nullptr &&
+ GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
- service->dispatch_impl_ = this;
+ service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@@ -288,6 +260,7 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
+ m->SetupRequest();
m->Request(server_, cq_.cq());
}
@@ -318,141 +291,90 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
- buf->FillOps(ops, &nops);
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
}
-class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(request),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(ctx),
- generic_ctx_(nullptr),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_registered_call(
- server->server_, registered_method, &call_, &call_details_.deadline,
- &array_, request ? &payload_ : nullptr, call_cq->cq(),
- notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::BaseAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : server_(server),
+ context_(context),
+ stream_(stream),
+ call_cq_(call_cq),
+ tag_(tag),
+ call_(nullptr) {
+ memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
+}
- AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(nullptr),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(nullptr),
- generic_ctx_(ctx),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
- call_cq->cq(), notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::~BaseAsyncRequest() {}
- ~AsyncRequest() {
- if (payload_) {
- grpc_byte_buffer_destroy(payload_);
+bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ if (*status) {
+ for (size_t i = 0; i < initial_metadata_array_.count; i++) {
+ context_->client_metadata_.insert(std::make_pair(
+ grpc::string(initial_metadata_array_.metadata[i].key),
+ grpc::string(initial_metadata_array_.metadata[i].value,
+ initial_metadata_array_.metadata[i].value +
+ initial_metadata_array_.metadata[i].value_length)));
}
- grpc_metadata_array_destroy(&array_);
}
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- *tag = tag_;
- bool orig_status = *status;
- if (*status && request_) {
- if (payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- *status =
- DeserializeProto(payload_, request_, server_->max_message_size_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- } else {
- *status = false;
- }
- }
- ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
- GPR_ASSERT(ctx);
- if (*status) {
- ctx->deadline_ = call_details_.deadline;
- for (size_t i = 0; i < array_.count; i++) {
- ctx->client_metadata_.insert(std::make_pair(
- grpc::string(array_.metadata[i].key),
- grpc::string(
- array_.metadata[i].value,
- array_.metadata[i].value + array_.metadata[i].value_length)));
- }
- if (generic_ctx_) {
- // TODO(yangg) remove the copy here.
- generic_ctx_->method_ = call_details_.method;
- generic_ctx_->host_ = call_details_.host;
- gpr_free(call_details_.method);
- gpr_free(call_details_.host);
- }
- }
- ctx->call_ = call_;
- ctx->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size_);
- if (orig_status && call_) {
- ctx->BeginCompletionOp(&call);
- }
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
- delete this;
- return true;
+ grpc_metadata_array_destroy(&initial_metadata_array_);
+ context_->call_ = call_;
+ context_->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
+ if (*status && call_) {
+ context_->BeginCompletionOp(&call);
}
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call);
+ *tag = tag_;
+ delete this;
+ return true;
+}
- private:
- void* const tag_;
- grpc::protobuf::Message* const request_;
- ServerAsyncStreamingInterface* const stream_;
- CompletionQueue* const call_cq_;
- ServerContext* const ctx_;
- GenericServerContext* const generic_ctx_;
- Server* const server_;
- grpc_call* call_;
- grpc_call_details call_details_;
- grpc_metadata_array array_;
- grpc_byte_buffer* payload_;
-};
+Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+
+void Server::RegisteredAsyncRequest::IssueRequest(
+ void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq) {
+ grpc_server_request_registered_call(
+ server_->server_, registered_method, &call_, &context_->deadline_,
+ &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
+ this);
+}
-void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, registered_method, context, request, stream, call_cq,
- notification_cq, tag);
+Server::GenericAsyncRequest::GenericAsyncRequest(
+ Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
+ grpc_server_request_call(server->server_, &call_, &call_details_,
+ &initial_metadata_array_, call_cq->cq(),
+ notification_cq->cq(), this);
}
-void Server::RequestAsyncGenericCall(GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
+bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ // TODO(yangg) remove the copy here.
+ if (*status) {
+ static_cast<GenericServerContext*>(context_)->method_ =
+ call_details_.method;
+ static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
+ }
+ gpr_free(call_details_.method);
+ gpr_free(call_details_.host);
+ return BaseAsyncRequest::FinalizeResult(tag, status);
}
void Server::ScheduleCallback() {
@@ -472,9 +394,13 @@ void Server::RunRpc() {
if (ok) {
SyncRequest::CallData cd(this, mrd);
{
+ mrd->SetupRequest();
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) {
mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
}
}
cd.Run();
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 4bcbd82952..3ee1d54e76 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -87,9 +87,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
if (!thread_pool_ && !services_.empty()) {
- int cores = gpr_cpu_num_cores();
- if (!cores) cores = 4;
- thread_pool_ = new ThreadPool(cores);
+ thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
std::unique_ptr<Server> server(
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6b5e41d0a8..699895a3cf 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
+class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
- AddServerRecvClose(&cancelled_);
- }
+ CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
- bool cancelled_;
+ int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@@ -73,14 +73,20 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ : false;
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+ ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops->data.recv_close_on_server.cancelled = &cancelled_;
+ ops->flags = 0;
+ *nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
- if (!*status) cancelled_ = true;
+ if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;
diff --git a/src/cpp/server/server_credentials.cc b/src/cpp/server/server_credentials.cc
index 6bdb465baa..be3a7425e0 100644
--- a/src/cpp/server/server_credentials.cc
+++ b/src/cpp/server/server_credentials.cc
@@ -31,8 +31,6 @@
*
*/
-#include <grpc/grpc_security.h>
-
#include <grpc++/server_credentials.h>
namespace grpc {
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index 26f25611b5..3b70249bf9 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -62,6 +62,8 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
void ThreadFunc();
};
+ThreadPoolInterface* CreateDefaultThreadPool();
+
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index ac2657472c..a66c92c3e1 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -31,17 +31,18 @@
*
*/
+#include <grpc/byte_buffer_reader.h>
#include <grpc++/byte_buffer.h>
namespace grpc {
-ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
+ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) {
// TODO(yangg) maybe expose some core API to simplify this
std::vector<gpr_slice> c_slices(nslices);
for (size_t i = 0; i < nslices; i++) {
c_slices[i] = slices[i].slice_;
}
- buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
+ buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices);
}
void ByteBuffer::Clear() {
@@ -51,20 +52,20 @@ void ByteBuffer::Clear() {
}
}
-void ByteBuffer::Dump(std::vector<Slice>* slices) {
+void ByteBuffer::Dump(std::vector<Slice>* slices) const {
slices->clear();
if (!buffer_) {
return;
}
- grpc_byte_buffer_reader* reader = grpc_byte_buffer_reader_create(buffer_);
+ grpc_byte_buffer_reader reader;
+ grpc_byte_buffer_reader_init(&reader, buffer_);
gpr_slice s;
- while (grpc_byte_buffer_reader_next(reader, &s)) {
+ while (grpc_byte_buffer_reader_next(&reader, &s)) {
slices->push_back(Slice(s, Slice::STEAL_REF));
}
- grpc_byte_buffer_reader_destroy(reader);
}
-size_t ByteBuffer::Length() {
+size_t ByteBuffer::Length() const {
if (buffer_) {
return grpc_byte_buffer_length(buffer_);
} else {
diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc
index b694a513e7..5bb9eda3d9 100644
--- a/src/cpp/util/status.cc
+++ b/src/cpp/util/status.cc
@@ -36,6 +36,6 @@
namespace grpc {
const Status& Status::OK = Status();
-const Status& Status::Cancelled = Status(StatusCode::CANCELLED);
+const Status& Status::CANCELLED = Status(StatusCode::CANCELLED, "");
} // namespace grpc
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index 1fef2a56de..fd94d00b32 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -42,6 +42,7 @@ using std::chrono::duration_cast;
using std::chrono::nanoseconds;
using std::chrono::seconds;
using std::chrono::system_clock;
+using std::chrono::high_resolution_clock;
namespace grpc {
@@ -59,6 +60,20 @@ void Timepoint2Timespec(const system_clock::time_point& from,
to->tv_nsec = nsecs.count();
}
+void TimepointHR2Timespec(const high_resolution_clock::time_point& from,
+ gpr_timespec* to) {
+ high_resolution_clock::duration deadline = from.time_since_epoch();
+ seconds secs = duration_cast<seconds>(deadline);
+ if (from == high_resolution_clock::time_point::max() ||
+ secs.count() >= gpr_inf_future.tv_sec || secs.count() < 0) {
+ *to = gpr_inf_future;
+ return;
+ }
+ nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs);
+ to->tv_sec = secs.count();
+ to->tv_nsec = nsecs.count();
+}
+
system_clock::time_point Timespec2Timepoint(gpr_timespec t) {
if (gpr_time_cmp(t, gpr_inf_future) == 0) {
return system_clock::time_point::max();