aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/client')
-rw-r--r--src/cpp/client/channel.cc115
-rw-r--r--src/cpp/client/channel.h16
-rw-r--r--src/cpp/client/client_context.cc8
-rw-r--r--src/cpp/client/client_unary_call.cc89
4 files changed, 121 insertions, 107 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 3f39364bda..b2fc0c97ee 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -42,11 +42,12 @@
#include <grpc/support/slice.h>
#include "src/cpp/proto/proto_utils.h"
-#include "src/cpp/stream/stream_context.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/credentials.h>
+#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_method.h>
#include <grpc++/status.h>
#include <google/protobuf/message.h>
@@ -77,103 +78,25 @@ Channel::Channel(const grpc::string &target,
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
-namespace {
-// Pluck the finished event and set to status when it is not nullptr.
-void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag,
- Status *status) {
- grpc_event *ev =
- grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future);
- if (status) {
- StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
- grpc::string details(ev->data.finished.details ? ev->data.finished.details
- : "");
- *status = Status(error_code, details);
- }
- grpc_event_finish(ev);
+Call Channel::CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) {
+ auto c_call =
+ grpc_channel_create_call(
+ c_channel_, cq->cq(), method.name(),
+ context->authority().empty() ? target_.c_str()
+ : context->authority().c_str(),
+ context->RawDeadline());
+ context->set_call(c_call);
+ return Call(c_call, this, cq);
}
-} // namespace
-// TODO(yangg) more error handling
-Status Channel::StartBlockingRpc(const RpcMethod &method,
- ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result) {
- Status status;
- grpc_call *call = grpc_channel_create_call_old(
- c_channel_, method.name(), target_.c_str(), context->RawDeadline());
- context->set_call(call);
-
- grpc_event *ev;
- void *finished_tag = reinterpret_cast<char *>(call);
- void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
- void *write_tag = reinterpret_cast<char *>(call) + 3;
- void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
- void *read_tag = reinterpret_cast<char *>(call) + 5;
-
- grpc_completion_queue *cq = grpc_completion_queue_create();
- context->set_cq(cq);
- // add_metadata from context
- //
- // invoke
- GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- // write request
- grpc_byte_buffer *write_buffer = nullptr;
- bool success = SerializeProto(request, &write_buffer);
- if (!success) {
- grpc_call_cancel(call);
- status =
- Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
- GetFinalStatus(cq, finished_tag, nullptr);
- return status;
- }
- GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- grpc_byte_buffer_destroy(write_buffer);
- ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
-
- success = ev->data.write_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- if (!success) {
- GetFinalStatus(cq, finished_tag, &status);
- return status;
- }
- // writes done
- GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
- grpc_event_finish(ev);
- // start read metadata
- //
- ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
- grpc_event_finish(ev);
- // start read
- GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
- if (ev->data.read) {
- if (!DeserializeProto(ev->data.read, result)) {
- grpc_event_finish(ev);
- status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
- GetFinalStatus(cq, finished_tag, nullptr);
- return status;
- }
- }
- grpc_event_finish(ev);
-
- // wait status
- GetFinalStatus(cq, finished_tag, &status);
- return status;
-}
-
-StreamContextInterface *Channel::CreateStream(
- const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result) {
- grpc_call *call = grpc_channel_create_call_old(
- c_channel_, method.name(), target_.c_str(), context->RawDeadline());
- context->set_call(call);
- grpc_completion_queue *cq = grpc_completion_queue_create();
- context->set_cq(cq);
- return new StreamContext(method, context, request, result);
+void Channel::PerformOpsOnCall(CallOpBuffer *buf, Call *call) {
+ static const size_t MAX_OPS = 8;
+ size_t nops = MAX_OPS;
+ grpc_op ops[MAX_OPS];
+ buf->FillOps(ops, &nops);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), ops, nops, buf));
}
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 67d18bf4c8..c31adab723 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -42,11 +42,14 @@
struct grpc_channel;
namespace grpc {
+class Call;
+class CallOpBuffer;
class ChannelArguments;
+class CompletionQueue;
class Credentials;
class StreamContextInterface;
-class Channel : public ChannelInterface {
+class Channel final : public ChannelInterface {
public:
Channel(const grpc::string &target, const ChannelArguments &args);
Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds,
@@ -54,14 +57,9 @@ class Channel : public ChannelInterface {
~Channel() override;
- Status StartBlockingRpc(const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result) override;
-
- StreamContextInterface *CreateStream(
- const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result) override;
+ virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) override;
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) override;
private:
const grpc::string target_;
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 7bda2d07c3..64a829630d 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -72,9 +72,13 @@ system_clock::time_point ClientContext::absolute_deadline() {
void ClientContext::AddMetadata(const grpc::string &meta_key,
const grpc::string &meta_value) {
- return;
+ send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));
}
-void ClientContext::StartCancel() {}
+void ClientContext::TryCancel() {
+ if (call_) {
+ grpc_call_cancel(call_);
+ }
+}
} // namespace grpc
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
new file mode 100644
index 0000000000..284af33b43
--- /dev/null
+++ b/src/cpp/client/client_unary_call.cc
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/client_unary_call.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/status.h>
+#include <grpc/support/log.h>
+
+namespace grpc {
+
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpBuffer buf;
+ Status status;
+ buf.AddSendInitialMetadata(context);
+ buf.AddSendMessage(request);
+ buf.AddRecvInitialMetadata(&context->recv_initial_metadata_);
+ buf.AddRecvMessage(result);
+ buf.AddClientSendClose();
+ buf.AddClientRecvStatus(&context->trailing_metadata_, &status);
+ call.PerformOps(&buf);
+ GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
+ return status;
+}
+
+class ClientAsyncRequest final : public CallOpBuffer {
+ public:
+ void FinalizeResult(void **tag, bool *status) override {
+ CallOpBuffer::FinalizeResult(tag, status);
+ delete this;
+ }
+};
+
+void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result, Status *status,
+ CompletionQueue *cq, void *tag) {
+ ClientAsyncRequest *buf = new ClientAsyncRequest;
+ buf->Reset(tag);
+ Call call(channel->CreateCall(method, context, cq));
+ buf->AddSendInitialMetadata(context);
+ buf->AddSendMessage(request);
+ buf->AddRecvInitialMetadata(&context->recv_initial_metadata_);
+ buf->AddRecvMessage(result);
+ buf->AddClientSendClose();
+ buf->AddClientRecvStatus(&context->trailing_metadata_, status);
+ call.PerformOps(buf);
+}
+
+} // namespace grpc