aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel.h10
-rw-r--r--src/cpp/client/channel_arguments.cc18
-rw-r--r--src/cpp/client/client_context.cc8
-rw-r--r--src/cpp/client/client_unary_call.cc8
-rw-r--r--src/cpp/client/create_channel.cc9
-rw-r--r--src/cpp/client/secure_credentials.cc32
-rw-r--r--src/cpp/common/call.cc95
-rw-r--r--src/cpp/common/completion_queue.cc15
-rw-r--r--src/cpp/proto/proto_utils.cc22
-rw-r--r--src/cpp/proto/proto_utils.h6
-rw-r--r--src/cpp/server/async_generic_service.cc50
-rw-r--r--src/cpp/server/async_server_context.cc16
-rw-r--r--src/cpp/server/insecure_server_credentials.cc3
-rw-r--r--src/cpp/server/secure_server_credentials.cc12
-rw-r--r--src/cpp/server/server.cc73
-rw-r--r--src/cpp/server/server_builder.cc25
-rw-r--r--src/cpp/server/thread_pool.cc4
-rw-r--r--src/cpp/server/thread_pool.h2
-rw-r--r--src/cpp/util/byte_buffer.cc76
-rw-r--r--src/cpp/util/slice.cc48
-rw-r--r--src/cpp/util/status.cc4
-rw-r--r--src/cpp/util/time.cc4
-rw-r--r--src/cpp/util/time.h4
23 files changed, 420 insertions, 124 deletions
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index a1de3817e6..3980eba237 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -51,16 +51,16 @@ class StreamContextInterface;
class Channel GRPC_FINAL : public ChannelInterface {
public:
- Channel(const grpc::string &target, grpc_channel *c_channel);
+ Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
- CompletionQueue *cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) GRPC_OVERRIDE;
+ virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
+ CompletionQueue* cq) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
- grpc_channel *const c_channel_; // owned
+ grpc_channel* const c_channel_; // owned
};
} // namespace grpc
diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc
index abf0fc1c0a..87f8349eef 100644
--- a/src/cpp/client/channel_arguments.cc
+++ b/src/cpp/client/channel_arguments.cc
@@ -37,7 +37,7 @@
namespace grpc {
-void ChannelArguments::SetSslTargetNameOverride(const grpc::string &name) {
+void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) {
SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name);
}
@@ -50,32 +50,32 @@ grpc::string ChannelArguments::GetSslTargetNameOverride() const {
return "";
}
-void ChannelArguments::SetInt(const grpc::string &key, int value) {
+void ChannelArguments::SetInt(const grpc::string& key, int value) {
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
strings_.push_back(key);
- arg.key = const_cast<char *>(strings_.back().c_str());
+ arg.key = const_cast<char*>(strings_.back().c_str());
arg.value.integer = value;
args_.push_back(arg);
}
-void ChannelArguments::SetString(const grpc::string &key,
- const grpc::string &value) {
+void ChannelArguments::SetString(const grpc::string& key,
+ const grpc::string& value) {
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
strings_.push_back(key);
- arg.key = const_cast<char *>(strings_.back().c_str());
+ arg.key = const_cast<char*>(strings_.back().c_str());
strings_.push_back(value);
- arg.value.string = const_cast<char *>(strings_.back().c_str());
+ arg.value.string = const_cast<char*>(strings_.back().c_str());
args_.push_back(arg);
}
-void ChannelArguments::SetChannelArgs(grpc_channel_args *channel_args) const {
+void ChannelArguments::SetChannelArgs(grpc_channel_args* channel_args) const {
channel_args->num_args = args_.size();
if (channel_args->num_args > 0) {
- channel_args->args = const_cast<grpc_arg *>(&args_[0]);
+ channel_args->args = const_cast<grpc_arg*>(&args_[0]);
}
}
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 9f99f7bcd5..de9f8c7201 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -53,7 +53,7 @@ ClientContext::~ClientContext() {
if (cq_) {
grpc_completion_queue_shutdown(cq_);
// Drain cq_.
- grpc_event *ev;
+ grpc_event* ev;
grpc_completion_type t;
do {
ev = grpc_completion_queue_next(cq_, gpr_inf_future);
@@ -65,7 +65,7 @@ ClientContext::~ClientContext() {
}
void ClientContext::set_absolute_deadline(
- const system_clock::time_point &deadline) {
+ const system_clock::time_point& deadline) {
Timepoint2Timespec(deadline, &absolute_deadline_);
}
@@ -73,8 +73,8 @@ system_clock::time_point ClientContext::absolute_deadline() {
return Timespec2Timepoint(absolute_deadline_);
}
-void ClientContext::AddMetadata(const grpc::string &meta_key,
- const grpc::string &meta_value) {
+void ClientContext::AddMetadata(const grpc::string& meta_key,
+ const grpc::string& meta_value) {
send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));
}
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 5c179de9d8..7e7ea78bcd 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -42,10 +42,10 @@
namespace grpc {
// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
- ClientContext *context,
- const grpc::protobuf::Message &request,
- grpc::protobuf::Message *result) {
+Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context,
+ const grpc::protobuf::Message& request,
+ grpc::protobuf::Message* result) {
CompletionQueue cq;
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index 57d215d0f3..301430572a 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -41,9 +41,10 @@ namespace grpc {
class ChannelArguments;
std::shared_ptr<ChannelInterface> CreateChannel(
- const grpc::string &target, const std::unique_ptr<Credentials> &creds,
- const ChannelArguments &args) {
- return creds ? creds->CreateChannel(target, args) :
- std::shared_ptr<ChannelInterface>(new Channel(target, grpc_lame_client_channel_create()));
+ const grpc::string& target, const std::unique_ptr<Credentials>& creds,
+ const ChannelArguments& args) {
+ return creds ? creds->CreateChannel(target, args)
+ : std::shared_ptr<ChannelInterface>(
+ new Channel(target, grpc_lame_client_channel_create()));
}
} // namespace grpc
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 6ca702eead..e3c6637623 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -55,7 +55,8 @@ class SecureCredentials GRPC_FINAL : public Credentials {
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
args.GetSslTargetNameOverride().empty()
- ? target : args.GetSslTargetNameOverride(),
+ ? target
+ : args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
}
@@ -98,12 +99,37 @@ std::unique_ptr<Credentials> ComputeEngineCredentials() {
std::unique_ptr<Credentials> ServiceAccountCredentials(
const grpc::string& json_key, const grpc::string& scope,
std::chrono::seconds token_lifetime) {
- gpr_timespec lifetime = gpr_time_from_seconds(
- token_lifetime.count() > 0 ? token_lifetime.count() : 0);
+ if (token_lifetime.count() <= 0) {
+ gpr_log(GPR_ERROR,
+ "Trying to create ServiceAccountCredentials "
+ "with non-positive lifetime");
+ return WrapCredentials(nullptr);
+ }
+ gpr_timespec lifetime = gpr_time_from_seconds(token_lifetime.count());
return WrapCredentials(grpc_service_account_credentials_create(
json_key.c_str(), scope.c_str(), lifetime));
}
+// Builds JWT credentials.
+std::unique_ptr<Credentials> JWTCredentials(
+ const grpc::string& json_key, std::chrono::seconds token_lifetime) {
+ if (token_lifetime.count() <= 0) {
+ gpr_log(GPR_ERROR,
+ "Trying to create JWTCredentials with non-positive lifetime");
+ return WrapCredentials(nullptr);
+ }
+ gpr_timespec lifetime = gpr_time_from_seconds(token_lifetime.count());
+ return WrapCredentials(
+ grpc_jwt_credentials_create(json_key.c_str(), lifetime));
+}
+
+// Builds refresh token credentials.
+std::unique_ptr<Credentials> RefreshTokenCredentials(
+ const grpc::string& json_refresh_token) {
+ return WrapCredentials(
+ grpc_refresh_token_credentials_create(json_refresh_token.c_str()));
+}
+
// Builds IAM credentials.
std::unique_ptr<Credentials> IAMCredentials(
const grpc::string& authorization_token,
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 6ce1e8a7d5..5c26a1ad7c 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -31,8 +31,10 @@
*
*/
-#include <grpc/support/alloc.h>
#include <grpc++/impl/call.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc++/byte_buffer.h>
#include <grpc++/client_context.h>
#include <grpc++/channel_interface.h>
@@ -46,15 +48,15 @@ CallOpBuffer::CallOpBuffer()
initial_metadata_count_(0),
initial_metadata_(nullptr),
recv_initial_metadata_(nullptr),
- recv_initial_metadata_arr_{0, 0, nullptr},
send_message_(nullptr),
- send_message_buf_(nullptr),
+ send_message_buffer_(nullptr),
+ send_buf_(nullptr),
recv_message_(nullptr),
- recv_message_buf_(nullptr),
+ recv_message_buffer_(nullptr),
+ recv_buf_(nullptr),
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
- recv_trailing_metadata_arr_{0, 0, nullptr},
status_code_(GRPC_STATUS_OK),
status_details_(nullptr),
status_details_capacity_(0),
@@ -62,7 +64,12 @@ CallOpBuffer::CallOpBuffer()
trailing_metadata_count_(0),
trailing_metadata_(nullptr),
cancelled_buf_(0),
- recv_closed_(nullptr) {}
+ recv_closed_(nullptr) {
+ memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
+ memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
+ recv_trailing_metadata_arr_.metadata = nullptr;
+ recv_initial_metadata_arr_.metadata = nullptr;
+}
void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag;
@@ -74,18 +81,20 @@ void CallOpBuffer::Reset(void* next_return_tag) {
recv_initial_metadata_ = nullptr;
recv_initial_metadata_arr_.count = 0;
- send_message_ = nullptr;
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
- send_message_buf_ = nullptr;
+ if (send_buf_ && send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
}
+ send_message_ = nullptr;
+ send_message_buffer_ = nullptr;
+ send_buf_ = nullptr;
- recv_message_ = nullptr;
got_message = false;
- if (recv_message_buf_) {
- grpc_byte_buffer_destroy(recv_message_buf_);
- recv_message_buf_ = nullptr;
+ if (recv_buf_ && recv_message_) {
+ grpc_byte_buffer_destroy(recv_buf_);
}
+ recv_message_ = nullptr;
+ recv_message_buffer_ = nullptr;
+ recv_buf_ = nullptr;
client_send_close_ = false;
@@ -106,11 +115,11 @@ CallOpBuffer::~CallOpBuffer() {
gpr_free(status_details_);
gpr_free(recv_initial_metadata_arr_.metadata);
gpr_free(recv_trailing_metadata_arr_.metadata);
- if (recv_message_buf_) {
- grpc_byte_buffer_destroy(recv_message_buf_);
+ if (recv_buf_ && recv_message_) {
+ grpc_byte_buffer_destroy(recv_buf_);
}
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
+ if (send_buf_ && send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
}
}
@@ -166,11 +175,19 @@ void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message;
}
+void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
+ send_message_buffer_ = &message;
+}
+
void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message;
recv_message_->Clear();
}
+void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
+ recv_message_buffer_ = message;
+}
+
void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
@@ -206,19 +223,23 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
(*nops)++;
}
- if (send_message_) {
- bool success = SerializeProto(*send_message_, &send_message_buf_);
- if (!success) {
- abort();
- // TODO handle parse failure
+ if (send_message_ || send_message_buffer_) {
+ if (send_message_) {
+ bool success = SerializeProto(*send_message_, &send_buf_);
+ if (!success) {
+ abort();
+ // TODO handle parse failure
+ }
+ } else {
+ send_buf_ = send_message_buffer_->buffer();
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
- ops[*nops].data.send_message = send_message_buf_;
+ ops[*nops].data.send_message = send_buf_;
(*nops)++;
}
- if (recv_message_) {
+ if (recv_message_ || recv_message_buffer_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
- ops[*nops].data.recv_message = &recv_message_buf_;
+ ops[*nops].data.recv_message = &recv_buf_;
(*nops)++;
}
if (client_send_close_) {
@@ -256,9 +277,11 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
- if (send_message_buf_) {
- grpc_byte_buffer_destroy(send_message_buf_);
- send_message_buf_ = nullptr;
+ if (send_buf_ && send_message_) {
+ if (send_message_) {
+ grpc_byte_buffer_destroy(send_buf_);
+ }
+ send_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
@@ -275,12 +298,16 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
}
// Parse received message if any.
- if (recv_message_) {
- if (recv_message_buf_) {
+ if (recv_message_ || recv_message_buffer_) {
+ if (recv_buf_) {
got_message = *status;
- *status = *status && DeserializeProto(recv_message_buf_, recv_message_);
- grpc_byte_buffer_destroy(recv_message_buf_);
- recv_message_buf_ = nullptr;
+ if (recv_message_) {
+ *status = *status && DeserializeProto(recv_buf_, recv_message_);
+ grpc_byte_buffer_destroy(recv_buf_);
+ } else {
+ recv_message_buffer_->set_buffer(recv_buf_);
+ }
+ recv_buf_ = nullptr;
} else {
// Read failed
got_message = false;
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 414966c1cd..fede2da016 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -57,19 +57,26 @@ class EventDeleter {
}
};
-bool CompletionQueue::Next(void** tag, bool* ok) {
+CompletionQueue::NextStatus
+CompletionQueue::AsyncNext(void** tag, bool* ok,
+ std::chrono::system_clock::time_point deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
+ gpr_timespec gpr_deadline;
+ Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
- ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+ ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
+ if (!ev) { /* got a NULL back because deadline passed */
+ return TIMEOUT;
+ }
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
- return false;
+ return SHUTDOWN;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
- return true;
+ return GOT_EVENT;
}
}
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 9254e5879f..b8de2ea173 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -45,7 +45,7 @@ const int kMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer **bp,
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_byte_buffer_create(NULL, 0);
@@ -58,7 +58,7 @@ class GrpcBufferWriter GRPC_FINAL
}
}
- bool Next(void **data, int *size) GRPC_OVERRIDE {
+ bool Next(void** data, int* size) GRPC_OVERRIDE {
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
@@ -89,7 +89,7 @@ class GrpcBufferWriter GRPC_FINAL
private:
const int block_size_;
gpr_int64 byte_count_;
- gpr_slice_buffer *slice_buffer_;
+ gpr_slice_buffer* slice_buffer_;
bool have_backup_;
gpr_slice backup_slice_;
gpr_slice slice_;
@@ -98,7 +98,7 @@ class GrpcBufferWriter GRPC_FINAL
class GrpcBufferReader GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public:
- explicit GrpcBufferReader(grpc_byte_buffer *buffer)
+ explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
reader_ = grpc_byte_buffer_reader_create(buffer);
}
@@ -106,7 +106,7 @@ class GrpcBufferReader GRPC_FINAL
grpc_byte_buffer_reader_destroy(reader_);
}
- bool Next(const void **data, int *size) GRPC_OVERRIDE {
+ bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_;
@@ -123,12 +123,10 @@ class GrpcBufferReader GRPC_FINAL
return true;
}
- void BackUp(int count) GRPC_OVERRIDE {
- backup_count_ = count;
- }
+ void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
bool Skip(int count) GRPC_OVERRIDE {
- const void *data;
+ const void* data;
int size;
while (Next(&data, &size)) {
if (size >= count) {
@@ -149,18 +147,18 @@ class GrpcBufferReader GRPC_FINAL
private:
gpr_int64 byte_count_;
gpr_int64 backup_count_;
- grpc_byte_buffer_reader *reader_;
+ grpc_byte_buffer_reader* reader_;
gpr_slice slice_;
};
namespace grpc {
-bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) {
+bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer);
}
-bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) {
+bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) {
GrpcBufferReader reader(buffer);
return msg->ParseFromZeroCopyStream(&reader);
}
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h
index 7a1b1f8b7c..bc60dc9929 100644
--- a/src/cpp/proto/proto_utils.h
+++ b/src/cpp/proto/proto_utils.h
@@ -43,11 +43,11 @@ namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged.
-bool SerializeProto(const grpc::protobuf::Message &msg,
- grpc_byte_buffer **buffer);
+bool SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg);
+bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg);
} // namespace grpc
diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc
new file mode 100644
index 0000000000..07cb933715
--- /dev/null
+++ b/src/cpp/server/async_generic_service.cc
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/async_generic_service.h>
+
+#include <grpc++/server.h>
+
+namespace grpc {
+
+void AsyncGenericService::RequestCall(
+ GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
+ CompletionQueue* cq, void* tag) {
+ server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag);
+}
+
+CompletionQueue* AsyncGenericService::completion_queue() {
+ return &server_->cq_;
+}
+
+} // namespace grpc
diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc
index f21efcfb19..628822a338 100644
--- a/src/cpp/server/async_server_context.cc
+++ b/src/cpp/server/async_server_context.cc
@@ -42,7 +42,7 @@
namespace grpc {
AsyncServerContext::AsyncServerContext(
- grpc_call *call, const grpc::string &method, const grpc::string &host,
+ grpc_call* call, const grpc::string& method, const grpc::string& host,
system_clock::time_point absolute_deadline)
: method_(method),
host_(host),
@@ -52,22 +52,22 @@ AsyncServerContext::AsyncServerContext(
AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); }
-void AsyncServerContext::Accept(grpc_completion_queue *cq) {
+void AsyncServerContext::Accept(grpc_completion_queue* cq) {
GPR_ASSERT(grpc_call_server_accept_old(call_, cq, this) == GRPC_CALL_OK);
GPR_ASSERT(grpc_call_server_end_initial_metadata_old(
call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
}
-bool AsyncServerContext::StartRead(grpc::protobuf::Message *request) {
+bool AsyncServerContext::StartRead(grpc::protobuf::Message* request) {
GPR_ASSERT(request);
request_ = request;
grpc_call_error err = grpc_call_start_read_old(call_, this);
return err == GRPC_CALL_OK;
}
-bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response,
+bool AsyncServerContext::StartWrite(const grpc::protobuf::Message& response,
int flags) {
- grpc_byte_buffer *buffer = nullptr;
+ grpc_byte_buffer* buffer = nullptr;
if (!SerializeProto(response, &buffer)) {
return false;
}
@@ -76,16 +76,16 @@ bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response,
return err == GRPC_CALL_OK;
}
-bool AsyncServerContext::StartWriteStatus(const Status &status) {
+bool AsyncServerContext::StartWriteStatus(const Status& status) {
grpc_call_error err = grpc_call_start_write_status_old(
call_, static_cast<grpc_status_code>(status.code()),
status.details().empty() ? nullptr
- : const_cast<char *>(status.details().c_str()),
+ : const_cast<char*>(status.details().c_str()),
this);
return err == GRPC_CALL_OK;
}
-bool AsyncServerContext::ParseRead(grpc_byte_buffer *read_buffer) {
+bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) {
GPR_ASSERT(request_);
bool success = DeserializeProto(read_buffer, request_);
request_ = nullptr;
diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc
index f5e4732f73..55dd90d7a7 100644
--- a/src/cpp/server/insecure_server_credentials.cc
+++ b/src/cpp/server/insecure_server_credentials.cc
@@ -46,7 +46,8 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials {
} // namespace
std::shared_ptr<ServerCredentials> InsecureServerCredentials() {
- return std::shared_ptr<ServerCredentials>(new InsecureServerCredentialsImpl());
+ return std::shared_ptr<ServerCredentials>(
+ new InsecureServerCredentialsImpl());
}
} // namespace grpc
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index ff35638503..88f7a9b1a9 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -40,7 +40,8 @@ namespace grpc {
namespace {
class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
public:
- explicit SecureServerCredentials(grpc_server_credentials* creds) : creds_(creds) {}
+ explicit SecureServerCredentials(grpc_server_credentials* creds)
+ : creds_(creds) {}
~SecureServerCredentials() GRPC_OVERRIDE {
grpc_server_credentials_release(creds_);
}
@@ -56,16 +57,17 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
} // namespace
std::shared_ptr<ServerCredentials> SslServerCredentials(
- const SslServerCredentialsOptions &options) {
+ const SslServerCredentialsOptions& options) {
std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
- for (const auto &key_cert_pair : options.pem_key_cert_pairs) {
+ for (const auto& key_cert_pair : options.pem_key_cert_pairs) {
pem_key_cert_pairs.push_back(
{key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()});
}
- grpc_server_credentials *c_creds = grpc_ssl_server_credentials_create(
+ grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
&pem_key_cert_pairs[0], pem_key_cert_pairs.size());
- return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(c_creds));
+ return std::shared_ptr<ServerCredentials>(
+ new SecureServerCredentials(c_creds));
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index e69032a657..5a4ca6915a 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -36,8 +36,10 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
+#include <grpc++/async_generic_service.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
@@ -179,12 +181,12 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
thread_pool_owned_(thread_pool_owned) {}
Server::~Server() {
- std::unique_lock<std::mutex> lock(mu_);
- if (started_ && !shutdown_) {
- lock.unlock();
- Shutdown();
- } else {
- lock.unlock();
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ if (started_ && !shutdown_) {
+ lock.unlock();
+ Shutdown();
+ }
}
grpc_server_destroy(server_);
if (thread_pool_owned_) {
@@ -226,7 +228,14 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
return true;
}
-int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) {
+void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an async generic service against one server.");
+ service->server_ = this;
+}
+
+int Server::AddListeningPort(const grpc::string& addr,
+ ServerCredentials* creds) {
GPR_ASSERT(!started_);
return creds->AddPortToServer(addr, server_);
}
@@ -289,13 +298,33 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
stream_(stream),
cq_(cq),
ctx_(ctx),
+ generic_ctx_(nullptr),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
+ grpc_call_details_init(&call_details_);
grpc_server_request_registered_call(
- server->server_, registered_method, &call_, &deadline_, &array_,
- request ? &payload_ : nullptr, cq->cq(), this);
+ server->server_, registered_method, &call_, &call_details_.deadline,
+ &array_, request ? &payload_ : nullptr, cq->cq(), this);
+ }
+
+ AsyncRequest(Server* server, GenericServerContext* ctx,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
+ void* tag)
+ : tag_(tag),
+ request_(nullptr),
+ stream_(stream),
+ cq_(cq),
+ ctx_(nullptr),
+ generic_ctx_(ctx),
+ server_(server),
+ call_(nullptr),
+ payload_(nullptr) {
+ memset(&array_, 0, sizeof(array_));
+ grpc_call_details_init(&call_details_);
+ grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
+ cq->cq(), this);
}
~AsyncRequest() {
@@ -315,20 +344,29 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
*status = false;
}
}
+ ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
+ GPR_ASSERT(ctx);
if (*status) {
- ctx_->deadline_ = Timespec2Timepoint(deadline_);
+ ctx->deadline_ = Timespec2Timepoint(call_details_.deadline);
for (size_t i = 0; i < array_.count; i++) {
- ctx_->client_metadata_.insert(std::make_pair(
+ ctx->client_metadata_.insert(std::make_pair(
grpc::string(array_.metadata[i].key),
grpc::string(
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
+ if (generic_ctx_) {
+ // TODO(yangg) remove the copy here.
+ generic_ctx_->method_ = call_details_.method;
+ generic_ctx_->host_ = call_details_.host;
+ gpr_free(call_details_.method);
+ gpr_free(call_details_.host);
+ }
}
- ctx_->call_ = call_;
+ ctx->call_ = call_;
Call call(call_, server_, cq_);
if (orig_status && call_) {
- ctx_->BeginCompletionOp(&call);
+ ctx->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here
stream_->BindCall(&call);
@@ -342,9 +380,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_;
ServerContext* const ctx_;
+ GenericServerContext* const generic_ctx_;
Server* const server_;
grpc_call* call_;
- gpr_timespec deadline_;
+ grpc_call_details call_details_;
grpc_metadata_array array_;
grpc_byte_buffer* payload_;
};
@@ -356,6 +395,12 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
}
+void Server::RequestAsyncGenericCall(GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ new AsyncRequest(this, context, stream, cq, tag);
+}
+
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 5de592334d..58bf9d937f 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -41,7 +41,8 @@
namespace grpc {
-ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {}
+ServerBuilder::ServerBuilder()
+ : generic_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
@@ -51,9 +52,20 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.push_back(service);
}
-void ServerBuilder::AddPort(const grpc::string& addr,
- std::shared_ptr<ServerCredentials> creds,
- int* selected_port) {
+void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
+ if (generic_service_) {
+ gpr_log(GPR_ERROR,
+ "Adding multiple AsyncGenericService is unsupported for now. "
+ "Dropping the service %p",
+ service);
+ return;
+ }
+ generic_service_ = service;
+}
+
+void ServerBuilder::AddListeningPort(const grpc::string& addr,
+ std::shared_ptr<ServerCredentials> creds,
+ int* selected_port) {
ports_.push_back(Port{addr, creds, selected_port});
}
@@ -84,8 +96,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+ if (generic_service_) {
+ server->RegisterAsyncGenericService(generic_service_);
+ }
for (auto& port : ports_) {
- int r = server->AddPort(port.addr, port.creds.get());
+ int r = server->AddListeningPort(port.addr, port.creds.get());
if (!r) return nullptr;
if (port.selected_port != nullptr) {
*port.selected_port = r;
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc
index 5dc9bcf916..d3013b806c 100644
--- a/src/cpp/server/thread_pool.cc
+++ b/src/cpp/server/thread_pool.cc
@@ -66,12 +66,12 @@ ThreadPool::~ThreadPool() {
shutdown_ = true;
cv_.notify_all();
}
- for (auto &t : threads_) {
+ for (auto& t : threads_) {
t.join();
}
}
-void ThreadPool::ScheduleCallback(const std::function<void()> &callback) {
+void ThreadPool::ScheduleCallback(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mu_);
callbacks_.push(callback);
cv_.notify_one();
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index 6157e403e9..6225d82a0b 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -50,7 +50,7 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
explicit ThreadPool(int num_threads);
~ThreadPool();
- void ScheduleCallback(const std::function<void()> &callback) GRPC_OVERRIDE;
+ void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE;
private:
std::mutex mu_;
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
new file mode 100644
index 0000000000..f8d8eec065
--- /dev/null
+++ b/src/cpp/util/byte_buffer.cc
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/byte_buffer.h>
+
+namespace grpc {
+
+ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
+ // TODO(yangg) maybe expose some core API to simplify this
+ std::vector<gpr_slice> c_slices(nslices);
+ for (size_t i = 0; i < nslices; i++) {
+ c_slices[i] = slices[i].slice_;
+ }
+ buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
+}
+
+void ByteBuffer::Clear() {
+ if (buffer_) {
+ grpc_byte_buffer_destroy(buffer_);
+ buffer_ = nullptr;
+ }
+}
+
+void ByteBuffer::Dump(std::vector<Slice>* slices) {
+ slices->clear();
+ if (!buffer_) {
+ return;
+ }
+ grpc_byte_buffer_reader* reader = grpc_byte_buffer_reader_create(buffer_);
+ gpr_slice s;
+ while (grpc_byte_buffer_reader_next(reader, &s)) {
+ slices->push_back(Slice(s, Slice::STEAL_REF));
+ gpr_slice_unref(s);
+ }
+ grpc_byte_buffer_reader_destroy(reader);
+}
+
+size_t ByteBuffer::Length() {
+ if (buffer_) {
+ return grpc_byte_buffer_length(buffer_);
+ } else {
+ return 0;
+ }
+}
+
+} // namespace grpc
diff --git a/src/cpp/util/slice.cc b/src/cpp/util/slice.cc
new file mode 100644
index 0000000000..57370dabc6
--- /dev/null
+++ b/src/cpp/util/slice.cc
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/slice.h>
+
+namespace grpc {
+
+Slice::Slice() : slice_(gpr_empty_slice()) {}
+
+Slice::~Slice() { gpr_slice_unref(slice_); }
+
+Slice::Slice(gpr_slice slice, AddRef) : slice_(gpr_slice_ref(slice)) {}
+
+Slice::Slice(gpr_slice slice, StealRef) : slice_(slice) {}
+
+Slice::Slice(const Slice& other) : slice_(gpr_slice_ref(other.slice_)) {}
+
+} // namespace grpc
diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc
index bbf8030668..b694a513e7 100644
--- a/src/cpp/util/status.cc
+++ b/src/cpp/util/status.cc
@@ -35,7 +35,7 @@
namespace grpc {
-const Status &Status::OK = Status();
-const Status &Status::Cancelled = Status(StatusCode::CANCELLED);
+const Status& Status::OK = Status();
+const Status& Status::Cancelled = Status(StatusCode::CANCELLED);
} // namespace grpc
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index 919e5623fa..44d2283e76 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -43,8 +43,8 @@ using std::chrono::system_clock;
namespace grpc {
// TODO(yangg) prevent potential overflow.
-void Timepoint2Timespec(const system_clock::time_point &from,
- gpr_timespec *to) {
+void Timepoint2Timespec(const system_clock::time_point& from,
+ gpr_timespec* to) {
system_clock::duration deadline = from.time_since_epoch();
seconds secs = duration_cast<seconds>(deadline);
nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs);
diff --git a/src/cpp/util/time.h b/src/cpp/util/time.h
index 1994848eb2..8b7fcf55f7 100644
--- a/src/cpp/util/time.h
+++ b/src/cpp/util/time.h
@@ -41,8 +41,8 @@
namespace grpc {
// from and to should be absolute time.
-void Timepoint2Timespec(const std::chrono::system_clock::time_point &from,
- gpr_timespec *to);
+void Timepoint2Timespec(const std::chrono::system_clock::time_point& from,
+ gpr_timespec* to);
std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t);