diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/channel.h | 10 | ||||
-rw-r--r-- | src/cpp/client/channel_arguments.cc | 18 | ||||
-rw-r--r-- | src/cpp/client/client_context.cc | 8 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 8 | ||||
-rw-r--r-- | src/cpp/client/create_channel.cc | 9 | ||||
-rw-r--r-- | src/cpp/client/secure_credentials.cc | 32 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 95 | ||||
-rw-r--r-- | src/cpp/common/completion_queue.cc | 15 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 22 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.h | 6 | ||||
-rw-r--r-- | src/cpp/server/async_generic_service.cc | 50 | ||||
-rw-r--r-- | src/cpp/server/async_server_context.cc | 16 | ||||
-rw-r--r-- | src/cpp/server/insecure_server_credentials.cc | 3 | ||||
-rw-r--r-- | src/cpp/server/secure_server_credentials.cc | 12 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 73 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 25 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/thread_pool.h | 2 | ||||
-rw-r--r-- | src/cpp/util/byte_buffer.cc | 76 | ||||
-rw-r--r-- | src/cpp/util/slice.cc | 48 | ||||
-rw-r--r-- | src/cpp/util/status.cc | 4 | ||||
-rw-r--r-- | src/cpp/util/time.cc | 4 | ||||
-rw-r--r-- | src/cpp/util/time.h | 4 |
23 files changed, 420 insertions, 124 deletions
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index a1de3817e6..3980eba237 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -51,16 +51,16 @@ class StreamContextInterface; class Channel GRPC_FINAL : public ChannelInterface { public: - Channel(const grpc::string &target, grpc_channel *c_channel); + Channel(const grpc::string& target, grpc_channel* c_channel); ~Channel() GRPC_OVERRIDE; - virtual Call CreateCall(const RpcMethod &method, ClientContext *context, - CompletionQueue *cq) GRPC_OVERRIDE; - virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) GRPC_OVERRIDE; + virtual Call CreateCall(const RpcMethod& method, ClientContext* context, + CompletionQueue* cq) GRPC_OVERRIDE; + virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; private: const grpc::string target_; - grpc_channel *const c_channel_; // owned + grpc_channel* const c_channel_; // owned }; } // namespace grpc diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc index abf0fc1c0a..87f8349eef 100644 --- a/src/cpp/client/channel_arguments.cc +++ b/src/cpp/client/channel_arguments.cc @@ -37,7 +37,7 @@ namespace grpc { -void ChannelArguments::SetSslTargetNameOverride(const grpc::string &name) { +void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) { SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name); } @@ -50,32 +50,32 @@ grpc::string ChannelArguments::GetSslTargetNameOverride() const { return ""; } -void ChannelArguments::SetInt(const grpc::string &key, int value) { +void ChannelArguments::SetInt(const grpc::string& key, int value) { grpc_arg arg; arg.type = GRPC_ARG_INTEGER; strings_.push_back(key); - arg.key = const_cast<char *>(strings_.back().c_str()); + arg.key = const_cast<char*>(strings_.back().c_str()); arg.value.integer = value; args_.push_back(arg); } -void ChannelArguments::SetString(const grpc::string &key, - const grpc::string &value) { +void ChannelArguments::SetString(const grpc::string& key, + const grpc::string& value) { grpc_arg arg; arg.type = GRPC_ARG_STRING; strings_.push_back(key); - arg.key = const_cast<char *>(strings_.back().c_str()); + arg.key = const_cast<char*>(strings_.back().c_str()); strings_.push_back(value); - arg.value.string = const_cast<char *>(strings_.back().c_str()); + arg.value.string = const_cast<char*>(strings_.back().c_str()); args_.push_back(arg); } -void ChannelArguments::SetChannelArgs(grpc_channel_args *channel_args) const { +void ChannelArguments::SetChannelArgs(grpc_channel_args* channel_args) const { channel_args->num_args = args_.size(); if (channel_args->num_args > 0) { - channel_args->args = const_cast<grpc_arg *>(&args_[0]); + channel_args->args = const_cast<grpc_arg*>(&args_[0]); } } diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 9f99f7bcd5..de9f8c7201 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -53,7 +53,7 @@ ClientContext::~ClientContext() { if (cq_) { grpc_completion_queue_shutdown(cq_); // Drain cq_. - grpc_event *ev; + grpc_event* ev; grpc_completion_type t; do { ev = grpc_completion_queue_next(cq_, gpr_inf_future); @@ -65,7 +65,7 @@ ClientContext::~ClientContext() { } void ClientContext::set_absolute_deadline( - const system_clock::time_point &deadline) { + const system_clock::time_point& deadline) { Timepoint2Timespec(deadline, &absolute_deadline_); } @@ -73,8 +73,8 @@ system_clock::time_point ClientContext::absolute_deadline() { return Timespec2Timepoint(absolute_deadline_); } -void ClientContext::AddMetadata(const grpc::string &meta_key, - const grpc::string &meta_value) { +void ClientContext::AddMetadata(const grpc::string& meta_key, + const grpc::string& meta_value) { send_initial_metadata_.insert(std::make_pair(meta_key, meta_value)); } diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 5c179de9d8..7e7ea78bcd 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -42,10 +42,10 @@ namespace grpc { // Wrapper that performs a blocking unary call -Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, - ClientContext *context, - const grpc::protobuf::Message &request, - grpc::protobuf::Message *result) { +Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, + const grpc::protobuf::Message& request, + grpc::protobuf::Message* result) { CompletionQueue cq; Call call(channel->CreateCall(method, context, &cq)); CallOpBuffer buf; diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index 57d215d0f3..301430572a 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -41,9 +41,10 @@ namespace grpc { class ChannelArguments; std::shared_ptr<ChannelInterface> CreateChannel( - const grpc::string &target, const std::unique_ptr<Credentials> &creds, - const ChannelArguments &args) { - return creds ? creds->CreateChannel(target, args) : - std::shared_ptr<ChannelInterface>(new Channel(target, grpc_lame_client_channel_create())); + const grpc::string& target, const std::unique_ptr<Credentials>& creds, + const ChannelArguments& args) { + return creds ? creds->CreateChannel(target, args) + : std::shared_ptr<ChannelInterface>( + new Channel(target, grpc_lame_client_channel_create())); } } // namespace grpc diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 6ca702eead..e3c6637623 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -55,7 +55,8 @@ class SecureCredentials GRPC_FINAL : public Credentials { args.SetChannelArgs(&channel_args); return std::shared_ptr<ChannelInterface>(new Channel( args.GetSslTargetNameOverride().empty() - ? target : args.GetSslTargetNameOverride(), + ? target + : args.GetSslTargetNameOverride(), grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args))); } @@ -98,12 +99,37 @@ std::unique_ptr<Credentials> ComputeEngineCredentials() { std::unique_ptr<Credentials> ServiceAccountCredentials( const grpc::string& json_key, const grpc::string& scope, std::chrono::seconds token_lifetime) { - gpr_timespec lifetime = gpr_time_from_seconds( - token_lifetime.count() > 0 ? token_lifetime.count() : 0); + if (token_lifetime.count() <= 0) { + gpr_log(GPR_ERROR, + "Trying to create ServiceAccountCredentials " + "with non-positive lifetime"); + return WrapCredentials(nullptr); + } + gpr_timespec lifetime = gpr_time_from_seconds(token_lifetime.count()); return WrapCredentials(grpc_service_account_credentials_create( json_key.c_str(), scope.c_str(), lifetime)); } +// Builds JWT credentials. +std::unique_ptr<Credentials> JWTCredentials( + const grpc::string& json_key, std::chrono::seconds token_lifetime) { + if (token_lifetime.count() <= 0) { + gpr_log(GPR_ERROR, + "Trying to create JWTCredentials with non-positive lifetime"); + return WrapCredentials(nullptr); + } + gpr_timespec lifetime = gpr_time_from_seconds(token_lifetime.count()); + return WrapCredentials( + grpc_jwt_credentials_create(json_key.c_str(), lifetime)); +} + +// Builds refresh token credentials. +std::unique_ptr<Credentials> RefreshTokenCredentials( + const grpc::string& json_refresh_token) { + return WrapCredentials( + grpc_refresh_token_credentials_create(json_refresh_token.c_str())); +} + // Builds IAM credentials. std::unique_ptr<Credentials> IAMCredentials( const grpc::string& authorization_token, diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 6ce1e8a7d5..5c26a1ad7c 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,8 +31,10 @@ * */ -#include <grpc/support/alloc.h> #include <grpc++/impl/call.h> + +#include <grpc/support/alloc.h> +#include <grpc++/byte_buffer.h> #include <grpc++/client_context.h> #include <grpc++/channel_interface.h> @@ -46,15 +48,15 @@ CallOpBuffer::CallOpBuffer() initial_metadata_count_(0), initial_metadata_(nullptr), recv_initial_metadata_(nullptr), - recv_initial_metadata_arr_{0, 0, nullptr}, send_message_(nullptr), - send_message_buf_(nullptr), + send_message_buffer_(nullptr), + send_buf_(nullptr), recv_message_(nullptr), - recv_message_buf_(nullptr), + recv_message_buffer_(nullptr), + recv_buf_(nullptr), client_send_close_(false), recv_trailing_metadata_(nullptr), recv_status_(nullptr), - recv_trailing_metadata_arr_{0, 0, nullptr}, status_code_(GRPC_STATUS_OK), status_details_(nullptr), status_details_capacity_(0), @@ -62,7 +64,12 @@ CallOpBuffer::CallOpBuffer() trailing_metadata_count_(0), trailing_metadata_(nullptr), cancelled_buf_(0), - recv_closed_(nullptr) {} + recv_closed_(nullptr) { + memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_)); + memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); + recv_trailing_metadata_arr_.metadata = nullptr; + recv_initial_metadata_arr_.metadata = nullptr; +} void CallOpBuffer::Reset(void* next_return_tag) { return_tag_ = next_return_tag; @@ -74,18 +81,20 @@ void CallOpBuffer::Reset(void* next_return_tag) { recv_initial_metadata_ = nullptr; recv_initial_metadata_arr_.count = 0; - send_message_ = nullptr; - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); - send_message_buf_ = nullptr; + if (send_buf_ && send_message_) { + grpc_byte_buffer_destroy(send_buf_); } + send_message_ = nullptr; + send_message_buffer_ = nullptr; + send_buf_ = nullptr; - recv_message_ = nullptr; got_message = false; - if (recv_message_buf_) { - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_buf_ && recv_message_) { + grpc_byte_buffer_destroy(recv_buf_); } + recv_message_ = nullptr; + recv_message_buffer_ = nullptr; + recv_buf_ = nullptr; client_send_close_ = false; @@ -106,11 +115,11 @@ CallOpBuffer::~CallOpBuffer() { gpr_free(status_details_); gpr_free(recv_initial_metadata_arr_.metadata); gpr_free(recv_trailing_metadata_arr_.metadata); - if (recv_message_buf_) { - grpc_byte_buffer_destroy(recv_message_buf_); + if (recv_buf_ && recv_message_) { + grpc_byte_buffer_destroy(recv_buf_); } - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); + if (send_buf_ && send_message_) { + grpc_byte_buffer_destroy(send_buf_); } } @@ -166,11 +175,19 @@ void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) { send_message_ = &message; } +void CallOpBuffer::AddSendMessage(const ByteBuffer& message) { + send_message_buffer_ = &message; +} + void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) { recv_message_ = message; recv_message_->Clear(); } +void CallOpBuffer::AddRecvMessage(ByteBuffer* message) { + recv_message_buffer_ = message; +} + void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } void CallOpBuffer::AddServerRecvClose(bool* cancelled) { @@ -206,19 +223,23 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; (*nops)++; } - if (send_message_) { - bool success = SerializeProto(*send_message_, &send_message_buf_); - if (!success) { - abort(); - // TODO handle parse failure + if (send_message_ || send_message_buffer_) { + if (send_message_) { + bool success = SerializeProto(*send_message_, &send_buf_); + if (!success) { + abort(); + // TODO handle parse failure + } + } else { + send_buf_ = send_message_buffer_->buffer(); } ops[*nops].op = GRPC_OP_SEND_MESSAGE; - ops[*nops].data.send_message = send_message_buf_; + ops[*nops].data.send_message = send_buf_; (*nops)++; } - if (recv_message_) { + if (recv_message_ || recv_message_buffer_) { ops[*nops].op = GRPC_OP_RECV_MESSAGE; - ops[*nops].data.recv_message = &recv_message_buf_; + ops[*nops].data.recv_message = &recv_buf_; (*nops)++; } if (client_send_close_) { @@ -256,9 +277,11 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { // Release send buffers. - if (send_message_buf_) { - grpc_byte_buffer_destroy(send_message_buf_); - send_message_buf_ = nullptr; + if (send_buf_ && send_message_) { + if (send_message_) { + grpc_byte_buffer_destroy(send_buf_); + } + send_buf_ = nullptr; } if (initial_metadata_) { gpr_free(initial_metadata_); @@ -275,12 +298,16 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); } // Parse received message if any. - if (recv_message_) { - if (recv_message_buf_) { + if (recv_message_ || recv_message_buffer_) { + if (recv_buf_) { got_message = *status; - *status = *status && DeserializeProto(recv_message_buf_, recv_message_); - grpc_byte_buffer_destroy(recv_message_buf_); - recv_message_buf_ = nullptr; + if (recv_message_) { + *status = *status && DeserializeProto(recv_buf_, recv_message_); + grpc_byte_buffer_destroy(recv_buf_); + } else { + recv_message_buffer_->set_buffer(recv_buf_); + } + recv_buf_ = nullptr; } else { // Read failed got_message = false; diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 414966c1cd..fede2da016 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -57,19 +57,26 @@ class EventDeleter { } }; -bool CompletionQueue::Next(void** tag, bool* ok) { +CompletionQueue::NextStatus +CompletionQueue::AsyncNext(void** tag, bool* ok, + std::chrono::system_clock::time_point deadline) { std::unique_ptr<grpc_event, EventDeleter> ev; + gpr_timespec gpr_deadline; + Timepoint2Timespec(deadline, &gpr_deadline); for (;;) { - ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); + ev.reset(grpc_completion_queue_next(cq_, gpr_deadline)); + if (!ev) { /* got a NULL back because deadline passed */ + return TIMEOUT; + } if (ev->type == GRPC_QUEUE_SHUTDOWN) { - return false; + return SHUTDOWN; } auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag); *ok = ev->data.op_complete == GRPC_OP_OK; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { - return true; + return GOT_EVENT; } } } diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 9254e5879f..b8de2ea173 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -45,7 +45,7 @@ const int kMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer **bp, + explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size = kMaxBufferLength) : block_size_(block_size), byte_count_(0), have_backup_(false) { *bp = grpc_byte_buffer_create(NULL, 0); @@ -58,7 +58,7 @@ class GrpcBufferWriter GRPC_FINAL } } - bool Next(void **data, int *size) GRPC_OVERRIDE { + bool Next(void** data, int* size) GRPC_OVERRIDE { if (have_backup_) { slice_ = backup_slice_; have_backup_ = false; @@ -89,7 +89,7 @@ class GrpcBufferWriter GRPC_FINAL private: const int block_size_; gpr_int64 byte_count_; - gpr_slice_buffer *slice_buffer_; + gpr_slice_buffer* slice_buffer_; bool have_backup_; gpr_slice backup_slice_; gpr_slice slice_; @@ -98,7 +98,7 @@ class GrpcBufferWriter GRPC_FINAL class GrpcBufferReader GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyInputStream { public: - explicit GrpcBufferReader(grpc_byte_buffer *buffer) + explicit GrpcBufferReader(grpc_byte_buffer* buffer) : byte_count_(0), backup_count_(0) { reader_ = grpc_byte_buffer_reader_create(buffer); } @@ -106,7 +106,7 @@ class GrpcBufferReader GRPC_FINAL grpc_byte_buffer_reader_destroy(reader_); } - bool Next(const void **data, int *size) GRPC_OVERRIDE { + bool Next(const void** data, int* size) GRPC_OVERRIDE { if (backup_count_ > 0) { *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - backup_count_; @@ -123,12 +123,10 @@ class GrpcBufferReader GRPC_FINAL return true; } - void BackUp(int count) GRPC_OVERRIDE { - backup_count_ = count; - } + void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } bool Skip(int count) GRPC_OVERRIDE { - const void *data; + const void* data; int size; while (Next(&data, &size)) { if (size >= count) { @@ -149,18 +147,18 @@ class GrpcBufferReader GRPC_FINAL private: gpr_int64 byte_count_; gpr_int64 backup_count_; - grpc_byte_buffer_reader *reader_; + grpc_byte_buffer_reader* reader_; gpr_slice slice_; }; namespace grpc { -bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { +bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { GrpcBufferWriter writer(bp); return msg.SerializeToZeroCopyStream(&writer); } -bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) { +bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) { GrpcBufferReader reader(buffer); return msg->ParseFromZeroCopyStream(&reader); } diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h index 7a1b1f8b7c..bc60dc9929 100644 --- a/src/cpp/proto/proto_utils.h +++ b/src/cpp/proto/proto_utils.h @@ -43,11 +43,11 @@ namespace grpc { // Serialize the msg into a buffer created inside the function. The caller // should destroy the returned buffer when done with it. If serialization fails, // false is returned and buffer is left unchanged. -bool SerializeProto(const grpc::protobuf::Message &msg, - grpc_byte_buffer **buffer); +bool SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer); // The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg); +bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg); } // namespace grpc diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc new file mode 100644 index 0000000000..07cb933715 --- /dev/null +++ b/src/cpp/server/async_generic_service.cc @@ -0,0 +1,50 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/async_generic_service.h> + +#include <grpc++/server.h> + +namespace grpc { + +void AsyncGenericService::RequestCall( + GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, + CompletionQueue* cq, void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); +} + +CompletionQueue* AsyncGenericService::completion_queue() { + return &server_->cq_; +} + +} // namespace grpc diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc index f21efcfb19..628822a338 100644 --- a/src/cpp/server/async_server_context.cc +++ b/src/cpp/server/async_server_context.cc @@ -42,7 +42,7 @@ namespace grpc { AsyncServerContext::AsyncServerContext( - grpc_call *call, const grpc::string &method, const grpc::string &host, + grpc_call* call, const grpc::string& method, const grpc::string& host, system_clock::time_point absolute_deadline) : method_(method), host_(host), @@ -52,22 +52,22 @@ AsyncServerContext::AsyncServerContext( AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); } -void AsyncServerContext::Accept(grpc_completion_queue *cq) { +void AsyncServerContext::Accept(grpc_completion_queue* cq) { GPR_ASSERT(grpc_call_server_accept_old(call_, cq, this) == GRPC_CALL_OK); GPR_ASSERT(grpc_call_server_end_initial_metadata_old( call_, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); } -bool AsyncServerContext::StartRead(grpc::protobuf::Message *request) { +bool AsyncServerContext::StartRead(grpc::protobuf::Message* request) { GPR_ASSERT(request); request_ = request; grpc_call_error err = grpc_call_start_read_old(call_, this); return err == GRPC_CALL_OK; } -bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response, +bool AsyncServerContext::StartWrite(const grpc::protobuf::Message& response, int flags) { - grpc_byte_buffer *buffer = nullptr; + grpc_byte_buffer* buffer = nullptr; if (!SerializeProto(response, &buffer)) { return false; } @@ -76,16 +76,16 @@ bool AsyncServerContext::StartWrite(const grpc::protobuf::Message &response, return err == GRPC_CALL_OK; } -bool AsyncServerContext::StartWriteStatus(const Status &status) { +bool AsyncServerContext::StartWriteStatus(const Status& status) { grpc_call_error err = grpc_call_start_write_status_old( call_, static_cast<grpc_status_code>(status.code()), status.details().empty() ? nullptr - : const_cast<char *>(status.details().c_str()), + : const_cast<char*>(status.details().c_str()), this); return err == GRPC_CALL_OK; } -bool AsyncServerContext::ParseRead(grpc_byte_buffer *read_buffer) { +bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) { GPR_ASSERT(request_); bool success = DeserializeProto(read_buffer, request_); request_ = nullptr; diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc index f5e4732f73..55dd90d7a7 100644 --- a/src/cpp/server/insecure_server_credentials.cc +++ b/src/cpp/server/insecure_server_credentials.cc @@ -46,7 +46,8 @@ class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials { } // namespace std::shared_ptr<ServerCredentials> InsecureServerCredentials() { - return std::shared_ptr<ServerCredentials>(new InsecureServerCredentialsImpl()); + return std::shared_ptr<ServerCredentials>( + new InsecureServerCredentialsImpl()); } } // namespace grpc diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index ff35638503..88f7a9b1a9 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -40,7 +40,8 @@ namespace grpc { namespace { class SecureServerCredentials GRPC_FINAL : public ServerCredentials { public: - explicit SecureServerCredentials(grpc_server_credentials* creds) : creds_(creds) {} + explicit SecureServerCredentials(grpc_server_credentials* creds) + : creds_(creds) {} ~SecureServerCredentials() GRPC_OVERRIDE { grpc_server_credentials_release(creds_); } @@ -56,16 +57,17 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials { } // namespace std::shared_ptr<ServerCredentials> SslServerCredentials( - const SslServerCredentialsOptions &options) { + const SslServerCredentialsOptions& options) { std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; - for (const auto &key_cert_pair : options.pem_key_cert_pairs) { + for (const auto& key_cert_pair : options.pem_key_cert_pairs) { pem_key_cert_pairs.push_back( {key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()}); } - grpc_server_credentials *c_creds = grpc_ssl_server_credentials_create( + grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create( options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), &pem_key_cert_pairs[0], pem_key_cert_pairs.size()); - return std::shared_ptr<ServerCredentials>(new SecureServerCredentials(c_creds)); + return std::shared_ptr<ServerCredentials>( + new SecureServerCredentials(c_creds)); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e69032a657..5a4ca6915a 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -36,8 +36,10 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc++/completion_queue.h> +#include <grpc++/async_generic_service.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> #include <grpc++/server_context.h> @@ -179,12 +181,12 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned) thread_pool_owned_(thread_pool_owned) {} Server::~Server() { - std::unique_lock<std::mutex> lock(mu_); - if (started_ && !shutdown_) { - lock.unlock(); - Shutdown(); - } else { - lock.unlock(); + { + std::unique_lock<std::mutex> lock(mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } } grpc_server_destroy(server_); if (thread_pool_owned_) { @@ -226,7 +228,14 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { return true; } -int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) { +void Server::RegisterAsyncGenericService(AsyncGenericService* service) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an async generic service against one server."); + service->server_ = this; +} + +int Server::AddListeningPort(const grpc::string& addr, + ServerCredentials* creds) { GPR_ASSERT(!started_); return creds->AddPortToServer(addr, server_); } @@ -289,13 +298,33 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { stream_(stream), cq_(cq), ctx_(ctx), + generic_ctx_(nullptr), server_(server), call_(nullptr), payload_(nullptr) { memset(&array_, 0, sizeof(array_)); + grpc_call_details_init(&call_details_); grpc_server_request_registered_call( - server->server_, registered_method, &call_, &deadline_, &array_, - request ? &payload_ : nullptr, cq->cq(), this); + server->server_, registered_method, &call_, &call_details_.deadline, + &array_, request ? &payload_ : nullptr, cq->cq(), this); + } + + AsyncRequest(Server* server, GenericServerContext* ctx, + ServerAsyncStreamingInterface* stream, CompletionQueue* cq, + void* tag) + : tag_(tag), + request_(nullptr), + stream_(stream), + cq_(cq), + ctx_(nullptr), + generic_ctx_(ctx), + server_(server), + call_(nullptr), + payload_(nullptr) { + memset(&array_, 0, sizeof(array_)); + grpc_call_details_init(&call_details_); + grpc_server_request_call(server->server_, &call_, &call_details_, &array_, + cq->cq(), this); } ~AsyncRequest() { @@ -315,20 +344,29 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { *status = false; } } + ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_; + GPR_ASSERT(ctx); if (*status) { - ctx_->deadline_ = Timespec2Timepoint(deadline_); + ctx->deadline_ = Timespec2Timepoint(call_details_.deadline); for (size_t i = 0; i < array_.count; i++) { - ctx_->client_metadata_.insert(std::make_pair( + ctx->client_metadata_.insert(std::make_pair( grpc::string(array_.metadata[i].key), grpc::string( array_.metadata[i].value, array_.metadata[i].value + array_.metadata[i].value_length))); } + if (generic_ctx_) { + // TODO(yangg) remove the copy here. + generic_ctx_->method_ = call_details_.method; + generic_ctx_->host_ = call_details_.host; + gpr_free(call_details_.method); + gpr_free(call_details_.host); + } } - ctx_->call_ = call_; + ctx->call_ = call_; Call call(call_, server_, cq_); if (orig_status && call_) { - ctx_->BeginCompletionOp(&call); + ctx->BeginCompletionOp(&call); } // just the pointers inside call are copied here stream_->BindCall(&call); @@ -342,9 +380,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { ServerAsyncStreamingInterface* const stream_; CompletionQueue* const cq_; ServerContext* const ctx_; + GenericServerContext* const generic_ctx_; Server* const server_; grpc_call* call_; - gpr_timespec deadline_; + grpc_call_details call_details_; grpc_metadata_array array_; grpc_byte_buffer* payload_; }; @@ -356,6 +395,12 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context, new AsyncRequest(this, registered_method, context, request, stream, cq, tag); } +void Server::RequestAsyncGenericCall(GenericServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + new AsyncRequest(this, context, stream, cq, tag); +} + void Server::ScheduleCallback() { { std::unique_lock<std::mutex> lock(mu_); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 5de592334d..58bf9d937f 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -41,7 +41,8 @@ namespace grpc { -ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} +ServerBuilder::ServerBuilder() + : generic_service_(nullptr), thread_pool_(nullptr) {} void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); @@ -51,9 +52,20 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { async_services_.push_back(service); } -void ServerBuilder::AddPort(const grpc::string& addr, - std::shared_ptr<ServerCredentials> creds, - int* selected_port) { +void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { + if (generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple AsyncGenericService is unsupported for now. " + "Dropping the service %p", + service); + return; + } + generic_service_ = service; +} + +void ServerBuilder::AddListeningPort(const grpc::string& addr, + std::shared_ptr<ServerCredentials> creds, + int* selected_port) { ports_.push_back(Port{addr, creds, selected_port}); } @@ -84,8 +96,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + if (generic_service_) { + server->RegisterAsyncGenericService(generic_service_); + } for (auto& port : ports_) { - int r = server->AddPort(port.addr, port.creds.get()); + int r = server->AddListeningPort(port.addr, port.creds.get()); if (!r) return nullptr; if (port.selected_port != nullptr) { *port.selected_port = r; diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc index 5dc9bcf916..d3013b806c 100644 --- a/src/cpp/server/thread_pool.cc +++ b/src/cpp/server/thread_pool.cc @@ -66,12 +66,12 @@ ThreadPool::~ThreadPool() { shutdown_ = true; cv_.notify_all(); } - for (auto &t : threads_) { + for (auto& t : threads_) { t.join(); } } -void ThreadPool::ScheduleCallback(const std::function<void()> &callback) { +void ThreadPool::ScheduleCallback(const std::function<void()>& callback) { std::lock_guard<std::mutex> lock(mu_); callbacks_.push(callback); cv_.notify_one(); diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h index 6157e403e9..6225d82a0b 100644 --- a/src/cpp/server/thread_pool.h +++ b/src/cpp/server/thread_pool.h @@ -50,7 +50,7 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface { explicit ThreadPool(int num_threads); ~ThreadPool(); - void ScheduleCallback(const std::function<void()> &callback) GRPC_OVERRIDE; + void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE; private: std::mutex mu_; diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc new file mode 100644 index 0000000000..f8d8eec065 --- /dev/null +++ b/src/cpp/util/byte_buffer.cc @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/byte_buffer.h> + +namespace grpc { + +ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) { + // TODO(yangg) maybe expose some core API to simplify this + std::vector<gpr_slice> c_slices(nslices); + for (size_t i = 0; i < nslices; i++) { + c_slices[i] = slices[i].slice_; + } + buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices); +} + +void ByteBuffer::Clear() { + if (buffer_) { + grpc_byte_buffer_destroy(buffer_); + buffer_ = nullptr; + } +} + +void ByteBuffer::Dump(std::vector<Slice>* slices) { + slices->clear(); + if (!buffer_) { + return; + } + grpc_byte_buffer_reader* reader = grpc_byte_buffer_reader_create(buffer_); + gpr_slice s; + while (grpc_byte_buffer_reader_next(reader, &s)) { + slices->push_back(Slice(s, Slice::STEAL_REF)); + gpr_slice_unref(s); + } + grpc_byte_buffer_reader_destroy(reader); +} + +size_t ByteBuffer::Length() { + if (buffer_) { + return grpc_byte_buffer_length(buffer_); + } else { + return 0; + } +} + +} // namespace grpc diff --git a/src/cpp/util/slice.cc b/src/cpp/util/slice.cc new file mode 100644 index 0000000000..57370dabc6 --- /dev/null +++ b/src/cpp/util/slice.cc @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/slice.h> + +namespace grpc { + +Slice::Slice() : slice_(gpr_empty_slice()) {} + +Slice::~Slice() { gpr_slice_unref(slice_); } + +Slice::Slice(gpr_slice slice, AddRef) : slice_(gpr_slice_ref(slice)) {} + +Slice::Slice(gpr_slice slice, StealRef) : slice_(slice) {} + +Slice::Slice(const Slice& other) : slice_(gpr_slice_ref(other.slice_)) {} + +} // namespace grpc diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc index bbf8030668..b694a513e7 100644 --- a/src/cpp/util/status.cc +++ b/src/cpp/util/status.cc @@ -35,7 +35,7 @@ namespace grpc { -const Status &Status::OK = Status(); -const Status &Status::Cancelled = Status(StatusCode::CANCELLED); +const Status& Status::OK = Status(); +const Status& Status::Cancelled = Status(StatusCode::CANCELLED); } // namespace grpc diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc index 919e5623fa..44d2283e76 100644 --- a/src/cpp/util/time.cc +++ b/src/cpp/util/time.cc @@ -43,8 +43,8 @@ using std::chrono::system_clock; namespace grpc { // TODO(yangg) prevent potential overflow. -void Timepoint2Timespec(const system_clock::time_point &from, - gpr_timespec *to) { +void Timepoint2Timespec(const system_clock::time_point& from, + gpr_timespec* to) { system_clock::duration deadline = from.time_since_epoch(); seconds secs = duration_cast<seconds>(deadline); nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs); diff --git a/src/cpp/util/time.h b/src/cpp/util/time.h index 1994848eb2..8b7fcf55f7 100644 --- a/src/cpp/util/time.h +++ b/src/cpp/util/time.h @@ -41,8 +41,8 @@ namespace grpc { // from and to should be absolute time. -void Timepoint2Timespec(const std::chrono::system_clock::time_point &from, - gpr_timespec *to); +void Timepoint2Timespec(const std::chrono::system_clock::time_point& from, + gpr_timespec* to); std::chrono::system_clock::time_point Timespec2Timepoint(gpr_timespec t); |