diff options
Diffstat (limited to 'src/cpp/client')
-rw-r--r-- | src/cpp/client/channel.cc | 115 | ||||
-rw-r--r-- | src/cpp/client/channel.h | 16 | ||||
-rw-r--r-- | src/cpp/client/client_context.cc | 8 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 89 |
4 files changed, 121 insertions, 107 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 3f39364bda..b2fc0c97ee 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -42,11 +42,12 @@ #include <grpc/support/slice.h> #include "src/cpp/proto/proto_utils.h" -#include "src/cpp/stream/stream_context.h" #include <grpc++/channel_arguments.h> #include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/credentials.h> +#include <grpc++/impl/call.h> #include <grpc++/impl/rpc_method.h> #include <grpc++/status.h> #include <google/protobuf/message.h> @@ -77,103 +78,25 @@ Channel::Channel(const grpc::string &target, Channel::~Channel() { grpc_channel_destroy(c_channel_); } -namespace { -// Pluck the finished event and set to status when it is not nullptr. -void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag, - Status *status) { - grpc_event *ev = - grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future); - if (status) { - StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status); - grpc::string details(ev->data.finished.details ? ev->data.finished.details - : ""); - *status = Status(error_code, details); - } - grpc_event_finish(ev); +Call Channel::CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) { + auto c_call = + grpc_channel_create_call( + c_channel_, cq->cq(), method.name(), + context->authority().empty() ? target_.c_str() + : context->authority().c_str(), + context->RawDeadline()); + context->set_call(c_call); + return Call(c_call, this, cq); } -} // namespace -// TODO(yangg) more error handling -Status Channel::StartBlockingRpc(const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) { - Status status; - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - - grpc_event *ev; - void *finished_tag = reinterpret_cast<char *>(call); - void *metadata_read_tag = reinterpret_cast<char *>(call) + 2; - void *write_tag = reinterpret_cast<char *>(call) + 3; - void *halfclose_tag = reinterpret_cast<char *>(call) + 4; - void *read_tag = reinterpret_cast<char *>(call) + 5; - - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - // add_metadata from context - // - // invoke - GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - // write request - grpc_byte_buffer *write_buffer = nullptr; - bool success = SerializeProto(request, &write_buffer); - if (!success) { - grpc_call_cancel(call); - status = - Status(StatusCode::DATA_LOSS, "Failed to serialize request proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - grpc_byte_buffer_destroy(write_buffer); - ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future); - - success = ev->data.write_accepted == GRPC_OP_OK; - grpc_event_finish(ev); - if (!success) { - GetFinalStatus(cq, finished_tag, &status); - return status; - } - // writes done - GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read metadata - // - ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read - GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future); - if (ev->data.read) { - if (!DeserializeProto(ev->data.read, result)) { - grpc_event_finish(ev); - status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - } - grpc_event_finish(ev); - - // wait status - GetFinalStatus(cq, finished_tag, &status); - return status; -} - -StreamContextInterface *Channel::CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) { - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - return new StreamContext(method, context, request, result); +void Channel::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, buf)); } } // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index 67d18bf4c8..c31adab723 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -42,11 +42,14 @@ struct grpc_channel; namespace grpc { +class Call; +class CallOpBuffer; class ChannelArguments; +class CompletionQueue; class Credentials; class StreamContextInterface; -class Channel : public ChannelInterface { +class Channel final : public ChannelInterface { public: Channel(const grpc::string &target, const ChannelArguments &args); Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds, @@ -54,14 +57,9 @@ class Channel : public ChannelInterface { ~Channel() override; - Status StartBlockingRpc(const RpcMethod &method, ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) override; - - StreamContextInterface *CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) override; + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) override; + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) override; private: const grpc::string target_; diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 7bda2d07c3..64a829630d 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -72,9 +72,13 @@ system_clock::time_point ClientContext::absolute_deadline() { void ClientContext::AddMetadata(const grpc::string &meta_key, const grpc::string &meta_value) { - return; + send_initial_metadata_.insert(std::make_pair(meta_key, meta_value)); } -void ClientContext::StartCancel() {} +void ClientContext::TryCancel() { + if (call_) { + grpc_call_cancel(call_); + } +} } // namespace grpc diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc new file mode 100644 index 0000000000..284af33b43 --- /dev/null +++ b/src/cpp/client/client_unary_call.cc @@ -0,0 +1,89 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/client_unary_call.h> +#include <grpc++/impl/call.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/status.h> +#include <grpc/support/log.h> + +namespace grpc { + +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpBuffer buf; + Status status; + buf.AddSendInitialMetadata(context); + buf.AddSendMessage(request); + buf.AddRecvInitialMetadata(&context->recv_initial_metadata_); + buf.AddRecvMessage(result); + buf.AddClientSendClose(); + buf.AddClientRecvStatus(&context->trailing_metadata_, &status); + call.PerformOps(&buf); + GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk()); + return status; +} + +class ClientAsyncRequest final : public CallOpBuffer { + public: + void FinalizeResult(void **tag, bool *status) override { + CallOpBuffer::FinalizeResult(tag, status); + delete this; + } +}; + +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag) { + ClientAsyncRequest *buf = new ClientAsyncRequest; + buf->Reset(tag); + Call call(channel->CreateCall(method, context, cq)); + buf->AddSendInitialMetadata(context); + buf->AddSendMessage(request); + buf->AddRecvInitialMetadata(&context->recv_initial_metadata_); + buf->AddRecvMessage(result); + buf->AddClientSendClose(); + buf->AddClientRecvStatus(&context->trailing_metadata_, status); + call.PerformOps(buf); +} + +} // namespace grpc |