aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yuki Yugui Sonoda <yugui@yugui.jp>2015-04-17 08:03:20 +0900
committerGravatar Yuki Yugui Sonoda <yugui@yugui.jp>2015-04-17 08:03:20 +0900
commitdca359cc5aedc45779828ad4aa7df196ca48d94a (patch)
tree21a18bd31791064717eb840f3993fc57b865c8f4 /src
parent29ee1dbc93d1b97578d39c796bfc19b99e57545f (diff)
parentd35b7107f8c54196ba8ddd55a0760e5f559e2014 (diff)
Merge branch 'master' into fix/typed-struct
Conflicts: src/ruby/ext/grpc/rb_grpc.c
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc149
-rw-r--r--src/core/surface/channel.c87
-rw-r--r--src/core/surface/init.c2
-rw-r--r--src/cpp/client/channel.cc21
-rw-r--r--src/cpp/client/channel.h1
-rw-r--r--src/cpp/client/generic_stub.cc12
-rw-r--r--src/node/examples/pubsub/empty.proto44
-rw-r--r--src/node/examples/pubsub/label.proto79
-rw-r--r--src/node/examples/pubsub/pubsub.proto734
-rw-r--r--src/node/examples/pubsub/pubsub_demo.js285
-rw-r--r--src/ruby/.rubocop_todo.yml4
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb32
-rw-r--r--src/ruby/ext/grpc/rb_call.c38
-rw-r--r--src/ruby/ext/grpc/rb_call.h7
-rw-r--r--src/ruby/ext/grpc/rb_channel.c5
-rw-r--r--src/ruby/ext/grpc/rb_channel.h3
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c8
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h4
-rw-r--r--src/ruby/ext/grpc/rb_credentials.c6
-rw-r--r--src/ruby/ext/grpc/rb_credentials.h4
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c19
-rw-r--r--src/ruby/ext/grpc/rb_grpc.h9
-rw-r--r--src/ruby/ext/grpc/rb_server.c2
-rw-r--r--src/ruby/ext/grpc/rb_server.h4
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c8
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.h4
-rwxr-xr-xsrc/ruby/grpc.gemspec12
-rw-r--r--src/ruby/lib/grpc/errors.rb4
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb94
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb32
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb17
31 files changed, 349 insertions, 1381 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index bd8bf65349..1324198847 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -110,7 +110,7 @@ bool HasBidiStreaming(const grpc::protobuf::FileDescriptor *file) {
return false;
}
-grpc::string FilenameIdentifier(const grpc::string& filename) {
+grpc::string FilenameIdentifier(const grpc::string &filename) {
grpc::string result;
for (unsigned i = 0; i < filename.size(); i++) {
char c = filename[i];
@@ -154,6 +154,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
+ "#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/status.h>\n"
"\n"
@@ -172,7 +173,9 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
temp.append("template <class OutMessage> class ClientWriter;\n");
temp.append("template <class InMessage> class ServerReader;\n");
temp.append("template <class OutMessage> class ClientAsyncWriter;\n");
- temp.append("template <class OutMessage, class InMessage> class ServerAsyncReader;\n");
+ temp.append(
+ "template <class OutMessage, class InMessage> class "
+ "ServerAsyncReader;\n");
}
if (HasServerOnlyStreaming(file)) {
temp.append("template <class InMessage> class ClientReader;\n");
@@ -246,11 +249,11 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
*vars,
"std::unique_ptr< ::grpc::ClientReader< $Response$>> $Method$("
"::grpc::ClientContext* context, const $Request$& request);\n");
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> Async$Method$("
- "::grpc::ClientContext* context, const $Request$& request, "
- "::grpc::CompletionQueue* cq, void* tag);\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> "
+ "Async$Method$("
+ "::grpc::ClientContext* context, const $Request$& request, "
+ "::grpc::CompletionQueue* cq, void* tag);\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@@ -264,10 +267,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
}
}
-void PrintHeaderServerMethodSync(
- grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
+void PrintHeaderClientMethodData(grpc::protobuf::io::Printer *printer,
+ const grpc::protobuf::MethodDescriptor *method,
+ std::map<grpc::string, grpc::string> *vars) {
+ (*vars)["Method"] = method->name();
+ printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n");
+}
+
+void PrintHeaderServerMethodSync(grpc::protobuf::io::Printer *printer,
+ const grpc::protobuf::MethodDescriptor *method,
+ std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
@@ -351,10 +360,18 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
"class Stub GRPC_FINAL : public ::grpc::InternalStub {\n"
" public:\n");
printer->Indent();
+ printer->Print(
+ "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars);
}
printer->Outdent();
+ printer->Print(" private:\n");
+ printer->Indent();
+ for (int i = 0; i < service->method_count(); ++i) {
+ PrintHeaderClientMethodData(printer, service->method(i), vars);
+ }
+ printer->Outdent();
printer->Print("};\n");
printer->Print(
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
@@ -479,7 +496,6 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
printer.Print(vars, "#include <grpc++/async_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/channel_interface.h>\n");
printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
- printer.Print(vars, "#include <grpc++/impl/rpc_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/stream.h>\n");
@@ -513,8 +529,8 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
- " return ::grpc::BlockingUnaryCall(channel(),"
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), "
+ " return ::grpc::BlockingUnaryCall(channel(), "
+ "rpcmethod_$Method$_, "
"context, request, response);\n"
"}\n\n");
printer->Print(
@@ -528,7 +544,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>>(new "
"::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), "
+ "rpcmethod_$Method$_, "
"context, request, tag));\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
@@ -540,8 +556,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
" return std::unique_ptr< ::grpc::ClientWriter< "
"$Request$>>(new ::grpc::ClientWriter< $Request$>("
"channel(),"
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
+ "rpcmethod_$Method$_, "
"context, response));\n"
"}\n\n");
printer->Print(*vars,
@@ -553,8 +568,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
" return std::unique_ptr< ::grpc::ClientAsyncWriter< "
"$Request$>>(new ::grpc::ClientAsyncWriter< $Request$>("
"channel(), cq, "
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
+ "rpcmethod_$Method$_, "
"context, response, tag));\n"
"}\n\n");
} else if (ServerOnlyStreaming(method)) {
@@ -567,8 +581,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
" return std::unique_ptr< ::grpc::ClientReader< "
"$Response$>>(new ::grpc::ClientReader< $Response$>("
"channel(),"
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
+ "rpcmethod_$Method$_, "
"context, request));\n"
"}\n\n");
printer->Print(*vars,
@@ -580,8 +593,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
" return std::unique_ptr< ::grpc::ClientAsyncReader< "
"$Response$>>(new ::grpc::ClientAsyncReader< $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
+ "rpcmethod_$Method$_, "
"context, request, tag));\n"
"}\n\n");
} else if (BidiStreaming(method)) {
@@ -594,22 +606,21 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"$Request$, $Response$>>(new ::grpc::ClientReaderWriter< "
"$Request$, $Response$>("
"channel(),"
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
+ "rpcmethod_$Method$_, "
"context));\n"
"}\n\n");
- printer->Print(*vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
- "$Request$, $Response$>> "
- "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(
+ *vars,
+ "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
+ "$Request$, $Response$>> "
+ "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>>(new "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
+ "rpcmethod_$Method$_, "
"context, tag));\n"
"}\n\n");
}
@@ -681,9 +692,9 @@ void PrintSourceServerAsyncMethod(
"$Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestAsyncUnary($Idx$, context, request, response, cq, tag);\n");
+ printer->Print(*vars,
+ " AsynchronousService::RequestAsyncUnary($Idx$, context, "
+ "request, response, cq, tag);\n");
printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
@@ -691,9 +702,9 @@ void PrintSourceServerAsyncMethod(
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestClientStreaming($Idx$, context, reader, cq, tag);\n");
+ printer->Print(*vars,
+ " AsynchronousService::RequestClientStreaming($Idx$, "
+ "context, reader, cq, tag);\n");
printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
@@ -702,9 +713,9 @@ void PrintSourceServerAsyncMethod(
"$Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestServerStreaming($Idx$, context, request, writer, cq, tag);\n");
+ printer->Print(*vars,
+ " AsynchronousService::RequestServerStreaming($Idx$, "
+ "context, request, writer, cq, tag);\n");
printer->Print("}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
@@ -713,9 +724,9 @@ void PrintSourceServerAsyncMethod(
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestBidiStreaming($Idx$, context, stream, cq, tag);\n");
+ printer->Print(*vars,
+ " AsynchronousService::RequestBidiStreaming($Idx$, "
+ "context, stream, cq, tag);\n");
printer->Print("}\n\n");
}
}
@@ -725,7 +736,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Service"] = service->name();
- printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n");
+ printer->Print(*vars,
+ "static const char* $prefix$$Service$_method_names[] = {\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Method"] = service->method(i)->name();
printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n");
@@ -736,21 +748,51 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
"const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
- " std::unique_ptr< $ns$$Service$::Stub> stub(new $ns$$Service$::Stub());\n"
- " stub->set_channel(channel);\n"
+ " std::unique_ptr< $ns$$Service$::Stub> stub(new "
+ "$ns$$Service$::Stub(channel));\n"
" return stub;\n"
"}\n\n");
+ printer->Print(*vars,
+ "$ns$$Service$::Stub::Stub(const std::shared_ptr< "
+ "::grpc::ChannelInterface>& channel)\n");
+ printer->Indent();
+ printer->Print(": ::grpc::InternalStub(channel)");
+ for (int i = 0; i < service->method_count(); ++i) {
+ const grpc::protobuf::MethodDescriptor *method = service->method(i);
+ (*vars)["Method"] = method->name();
+ (*vars)["Idx"] = as_string(i);
+ if (NoStreaming(method)) {
+ (*vars)["StreamingType"] = "NORMAL_RPC";
+ } else if (ClientOnlyStreaming(method)) {
+ (*vars)["StreamingType"] = "CLIENT_STREAMING";
+ } else if (ServerOnlyStreaming(method)) {
+ (*vars)["StreamingType"] = "SERVER_STREAMING";
+ } else {
+ (*vars)["StreamingType"] = "BIDI_STREAMING";
+ }
+ printer->Print(
+ *vars,
+ ", rpcmethod_$Method$_("
+ "$prefix$$Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod::$StreamingType$, "
+ "channel->RegisterMethod($prefix$$Service$_method_names[$Idx$])"
+ ")\n");
+ }
+ printer->Print("{}\n\n");
+ printer->Outdent();
+
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Idx"] = as_string(i);
PrintSourceClientMethod(printer, service->method(i), vars);
}
(*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print(
- *vars,
- "$ns$$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : "
- "::grpc::AsynchronousService(cq, $prefix$$Service$_method_names, $MethodCount$) "
- "{}\n\n");
+ printer->Print(*vars,
+ "$ns$$Service$::AsyncService::AsyncService(::grpc::"
+ "CompletionQueue* cq) : "
+ "::grpc::AsynchronousService(cq, "
+ "$prefix$$Service$_method_names, $MethodCount$) "
+ "{}\n\n");
printer->Print(*vars,
"$ns$$Service$::Service::~Service() {\n"
@@ -783,7 +825,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, $Request$, "
+ " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
+ "$Request$, "
"$Response$>(\n"
" std::function< ::grpc::Status($ns$$Service$::Service*, "
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index e764a3b9af..d3962a00c4 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -43,6 +43,12 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+typedef struct registered_call {
+ grpc_mdelem *path;
+ grpc_mdelem *authority;
+ struct registered_call *next;
+} registered_call;
+
struct grpc_channel {
int is_client;
gpr_refcount refs;
@@ -51,10 +57,14 @@ struct grpc_channel {
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
+
+ gpr_mu registered_call_mu;
+ registered_call *registered_calls;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
-#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1)
+#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
+ (((grpc_channel *)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
@@ -66,7 +76,8 @@ grpc_channel *grpc_channel_create_from_filters(
grpc_channel *channel = gpr_malloc(size);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */
+ /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if
+ * is_client */
gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
@@ -75,18 +86,17 @@ grpc_channel *grpc_channel_create_from_filters(
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
CHANNEL_STACK_FROM_CHANNEL(channel));
+ gpr_mu_init(&channel->registered_call_mu);
+ channel->registered_calls = NULL;
return channel;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
-grpc_call *grpc_channel_create_call(grpc_channel *channel,
- grpc_completion_queue *cq,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
+static grpc_call *grpc_channel_create_call_internal(
+ grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
+ grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_call *call;
- grpc_mdelem *path_mdelem;
- grpc_mdelem *authority_mdelem;
grpc_call_op op;
if (!channel->is_client) {
@@ -97,11 +107,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
call = grpc_call_create(channel, cq, NULL);
/* Add :path and :authority headers. */
- /* TODO(klempner): Consider optimizing this by stashing mdelems for common
- values of method and host. */
- path_mdelem = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, grpc_mdstr_ref(channel->path_string),
- grpc_mdstr_from_string(channel->metadata_context, method));
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
@@ -110,18 +115,14 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
op.user_data = NULL;
grpc_call_execute_op(call, &op);
- grpc_mdstr_ref(channel->authority_string);
- authority_mdelem = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, channel->authority_string,
- grpc_mdstr_from_string(channel->metadata_context, host));
op.data.metadata = authority_mdelem;
grpc_call_execute_op(call, &op);
- if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
+ if (0 != gpr_time_cmp(deadline, gpr_inf_future)) {
op.type = GRPC_SEND_DEADLINE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
- op.data.deadline = absolute_deadline;
+ op.data.deadline = deadline;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
@@ -130,6 +131,21 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
return call;
}
+grpc_call *grpc_channel_create_call(grpc_channel *channel,
+ grpc_completion_queue *cq,
+ const char *method, const char *host,
+ gpr_timespec deadline) {
+ return grpc_channel_create_call_internal(
+ channel, cq,
+ grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->path_string),
+ grpc_mdstr_from_string(channel->metadata_context, method)),
+ grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->authority_string),
+ grpc_mdstr_from_string(channel->metadata_context, host)),
+ deadline);
+}
+
grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec absolute_deadline) {
@@ -137,6 +153,31 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
absolute_deadline);
}
+void *grpc_channel_register_call(grpc_channel *channel, const char *method,
+ const char *host) {
+ registered_call *rc = gpr_malloc(sizeof(registered_call));
+ rc->path = grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->path_string),
+ grpc_mdstr_from_string(channel->metadata_context, method));
+ rc->authority = grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->authority_string),
+ grpc_mdstr_from_string(channel->metadata_context, host));
+ gpr_mu_lock(&channel->registered_call_mu);
+ rc->next = channel->registered_calls;
+ channel->registered_calls = rc;
+ gpr_mu_unlock(&channel->registered_call_mu);
+ return rc;
+}
+
+grpc_call *grpc_channel_create_registered_call(
+ grpc_channel *channel, grpc_completion_queue *completion_queue,
+ void *registered_call_handle, gpr_timespec deadline) {
+ registered_call *rc = registered_call_handle;
+ return grpc_channel_create_call_internal(
+ channel, completion_queue, grpc_mdelem_ref(rc->path),
+ grpc_mdelem_ref(rc->authority), deadline);
+}
+
void grpc_channel_internal_ref(grpc_channel *channel) {
gpr_ref(&channel->refs);
}
@@ -148,7 +189,15 @@ static void destroy_channel(void *p, int ok) {
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
+ while (channel->registered_calls) {
+ registered_call *rc = channel->registered_calls;
+ channel->registered_calls = rc->next;
+ grpc_mdelem_unref(rc->path);
+ grpc_mdelem_unref(rc->authority);
+ gpr_free(rc);
+ }
grpc_mdctx_unref(channel->metadata_context);
+ gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel);
}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 4de51a666f..5a119a47cc 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -61,8 +61,8 @@ void grpc_init(void) {
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("batch", &grpc_trace_batch);
grpc_security_pre_init();
- grpc_tracer_init("GRPC_TRACE");
grpc_iomgr_init();
+ grpc_tracer_init("GRPC_TRACE");
census_init();
grpc_timers_log_global_init();
}
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 72123abbc8..478f223322 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -61,12 +61,17 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
- auto c_call = grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
- context->authority().empty()
- ? target_.c_str()
- : context->authority().c_str(),
- context->RawDeadline());
- GRPC_TIMER_MARK(CALL_CREATED,c_call);
+ auto c_call =
+ method.channel_tag()
+ ? grpc_channel_create_registered_call(c_channel_, cq->cq(),
+ method.channel_tag(),
+ context->RawDeadline())
+ : grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
+ context->authority().empty()
+ ? target_.c_str()
+ : context->authority().c_str(),
+ context->RawDeadline());
+ GRPC_TIMER_MARK(CALL_CREATED, c_call);
context->set_call(c_call);
return Call(c_call, this, cq);
}
@@ -82,4 +87,8 @@ void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
GRPC_TIMER_MARK(PERFORM_OPS_END, call->call());
}
+void* Channel::RegisterMethod(const char* method) {
+ return grpc_channel_register_call(c_channel_, method, target_.c_str());
+}
+
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 3980eba237..aaf4dbe10d 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -54,6 +54,7 @@ class Channel GRPC_FINAL : public ChannelInterface {
Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
+ virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 3bf7bdf45f..0c90578ae5 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -39,13 +39,13 @@ namespace grpc {
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
- ClientContext* context, const grpc::string& method,
- CompletionQueue* cq, void* tag) {
+ ClientContext* context, const grpc::string& method, CompletionQueue* cq,
+ void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
new GenericClientAsyncReaderWriter(
- channel_.get(), cq, RpcMethod(method.c_str()), context, tag));
+ channel_.get(), cq,
+ RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING, nullptr),
+ context, tag));
}
-
-} // namespace grpc
-
+} // namespace grpc
diff --git a/src/node/examples/pubsub/empty.proto b/src/node/examples/pubsub/empty.proto
deleted file mode 100644
index 5d6eb10841..0000000000
--- a/src/node/examples/pubsub/empty.proto
+++ /dev/null
@@ -1,44 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-syntax = "proto2";
-
-package proto2;
-
-// An empty message that you can re-use to avoid defining duplicated empty
-// messages in your project. A typical example is to use it as argument or the
-// return value of a service API. For instance:
-//
-// service Foo {
-// rpc Bar (proto2.Empty) returns (proto2.Empty) { };
-// };
-//
-message Empty {}
diff --git a/src/node/examples/pubsub/label.proto b/src/node/examples/pubsub/label.proto
deleted file mode 100644
index 0af15a25a6..0000000000
--- a/src/node/examples/pubsub/label.proto
+++ /dev/null
@@ -1,79 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// Labels provide a way to associate user-defined metadata with various
-// objects. Labels may be used to organize objects into non-hierarchical
-// groups; think metadata tags attached to mp3s.
-
-syntax = "proto2";
-
-package tech.label;
-
-// A key-value pair applied to a given object.
-message Label {
- // The key of a label is a syntactically valid URL (as per RFC 1738) with
- // the "scheme" and initial slashes omitted and with the additional
- // restrictions noted below. Each key should be globally unique. The
- // "host" portion is called the "namespace" and is not necessarily
- // resolvable to a network endpoint. Instead, the namespace indicates what
- // system or entity defines the semantics of the label. Namespaces do not
- // restrict the set of objects to which a label may be associated.
- //
- // Keys are defined by the following grammar:
- //
- // key = hostname "/" kpath
- // kpath = ksegment *[ "/" ksegment ]
- // ksegment = alphadigit | *[ alphadigit | "-" | "_" | "." ]
- //
- // where "hostname" and "alphadigit" are defined as in RFC 1738.
- //
- // Example key:
- // spanner.google.com/universe
- required string key = 1;
-
- // The value of the label.
- oneof value {
- // A string value.
- string str_value = 2;
- // An integer value.
- int64 num_value = 3;
- }
-}
-
-// A collection of labels, such as the set of all labels attached to an
-// object. Each label in the set must have a different key.
-//
-// Users should prefer to embed "repeated Label" directly when possible.
-// This message should only be used in cases where that isn't possible (e.g.
-// with oneof).
-message Labels {
- repeated Label label = 1;
-}
diff --git a/src/node/examples/pubsub/pubsub.proto b/src/node/examples/pubsub/pubsub.proto
deleted file mode 100644
index 41a354773f..0000000000
--- a/src/node/examples/pubsub/pubsub.proto
+++ /dev/null
@@ -1,734 +0,0 @@
-// This file will be moved to a new location.
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-// Specification of the Pubsub API.
-
-syntax = "proto2";
-
-import "empty.proto";
-import "label.proto";
-
-package tech.pubsub;
-
-// -----------------------------------------------------------------------------
-// Overview of the Pubsub API
-// -----------------------------------------------------------------------------
-
-// This file describes an API for a Pubsub system. This system provides a
-// reliable many-to-many communication mechanism between independently written
-// publishers and subscribers where the publisher publishes messages to "topics"
-// and each subscriber creates a "subscription" and consumes messages from it.
-//
-// (a) The pubsub system maintains bindings between topics and subscriptions.
-// (b) A publisher publishes messages into a topic.
-// (c) The pubsub system delivers messages from topics into relevant
-// subscriptions.
-// (d) A subscriber receives pending messages from its subscription and
-// acknowledges or nacks each one to the pubsub system.
-// (e) The pubsub system removes acknowledged messages from that subscription.
-
-// -----------------------------------------------------------------------------
-// Data Model
-// -----------------------------------------------------------------------------
-
-// The data model consists of the following:
-//
-// * Topic: A topic is a resource to which messages are published by publishers.
-// Topics are named, and the name of the topic is unique within the pubsub
-// system.
-//
-// * Subscription: A subscription records the subscriber's interest in a topic.
-// It can optionally include a query to select a subset of interesting
-// messages. The pubsub system maintains a logical cursor tracking the
-// matching messages which still need to be delivered and acked so that
-// they can retried as needed. The set of messages that have not been
-// acknowledged is called the subscription backlog.
-//
-// * Message: A message is a unit of data that flows in the system. It contains
-// opaque data from the publisher along with its labels.
-//
-// * Message Labels (optional): A set of opaque key, value pairs assigned
-// by the publisher which the subscriber can use for filtering out messages
-// in the topic. For example, a label with key "foo.com/device_type" and
-// value "mobile" may be added for messages that are only relevant for a
-// mobile subscriber; a subscriber on a phone may decide to create a
-// subscription only for messages that have this label.
-
-// -----------------------------------------------------------------------------
-// Publisher Flow
-// -----------------------------------------------------------------------------
-
-// A publisher publishes messages to the topic using the Publish request:
-//
-// PubsubMessage message;
-// message.set_data("....");
-// Label label;
-// label.set_key("foo.com/key1");
-// label.set_str_value("value1");
-// message.add_label(label);
-// PublishRequest request;
-// request.set_topic("topicName");
-// request.set_message(message);
-// PublisherService.Publish(request);
-
-// -----------------------------------------------------------------------------
-// Subscriber Flow
-// -----------------------------------------------------------------------------
-
-// The subscriber part of the API is richer than the publisher part and has a
-// number of concepts w.r.t. subscription creation and monitoring:
-//
-// (1) A subscriber creates a subscription using the CreateSubscription call.
-// It may specify an optional "query" to indicate that it wants to receive
-// only messages with a certain set of labels using the label query syntax.
-// It may also specify an optional truncation policy to indicate when old
-// messages from the subcription can be removed.
-//
-// (2) A subscriber receives messages in one of two ways: via push or pull.
-//
-// (a) To receive messages via push, the PushConfig field must be specified in
-// the Subscription parameter when creating a subscription. The PushConfig
-// specifies an endpoint at which the subscriber must expose the
-// PushEndpointService. Messages are received via the HandlePubsubEvent
-// method. The push subscriber responds to the HandlePubsubEvent method
-// with a result code that indicates one of three things: Ack (the message
-// has been successfully processed and the Pubsub system may delete it),
-// Nack (the message has been rejected, the Pubsub system should resend it
-// at a later time), or Push-Back (this is a Nack with the additional
-// semantics that the subscriber is overloaded and the pubsub system should
-// back off on the rate at which it is invoking HandlePubsubEvent). The
-// endpoint may be a load balancer for better scalability.
-//
-// (b) To receive messages via pull a subscriber calls the Pull method on the
-// SubscriberService to get messages from the subscription. For each
-// individual message, the subscriber may use the ack_id received in the
-// PullResponse to Ack the message, Nack the message, or modify the ack
-// deadline with ModifyAckDeadline. See the
-// Subscription.ack_deadline_seconds field documentation for details on the
-// ack deadline behavior.
-//
-// Note: Messages may be consumed in parallel by multiple subscribers making
-// Pull calls to the same subscription; this will result in the set of
-// messages from the subscription being shared and each subscriber
-// receiving a subset of the messages.
-//
-// (4) The subscriber can explicitly truncate the current subscription.
-//
-// (5) "Truncated" events are delivered when a subscription is
-// truncated, whether due to the subscription's truncation policy
-// or an explicit request from the subscriber.
-//
-// Subscription creation:
-//
-// Subscription subscription;
-// subscription.set_topic("topicName");
-// subscription.set_name("subscriptionName");
-// subscription.push_config().set_push_endpoint("machinename:8888");
-// SubscriberService.CreateSubscription(subscription);
-//
-// Consuming messages via push:
-//
-// TODO(eschapira): Add HTTP push example.
-//
-// The port 'machinename:8888' must be bound to a stubby server that implements
-// the PushEndpointService with the following method:
-//
-// int HandlePubsubEvent(PubsubEvent event) {
-// if (event.subscription().equals("subscriptionName")) {
-// if (event.has_message()) {
-// Process(event.message().data());
-// } else if (event.truncated()) {
-// ProcessTruncatedEvent();
-// }
-// }
-// return OK; // This return code implies an acknowledgment
-// }
-//
-// Consuming messages via pull:
-//
-// The subscription must be created without setting the push_config field.
-//
-// PullRequest pull_request;
-// pull_request.set_subscription("subscriptionName");
-// pull_request.set_return_immediately(false);
-// while (true) {
-// PullResponse pull_response;
-// if (SubscriberService.Pull(pull_request, pull_response) == OK) {
-// PubsubEvent event = pull_response.pubsub_event();
-// if (event.has_message()) {
-// Process(event.message().data());
-// } else if (event.truncated()) {
-// ProcessTruncatedEvent();
-// }
-// AcknowledgeRequest ack_request;
-// ackRequest.set_subscription("subscriptionName");
-// ackRequest.set_ack_id(pull_response.ack_id());
-// SubscriberService.Acknowledge(ack_request);
-// }
-// }
-
-// -----------------------------------------------------------------------------
-// Reliability Semantics
-// -----------------------------------------------------------------------------
-
-// When a subscriber successfully creates a subscription using
-// Subscriber.CreateSubscription, it establishes a "subscription point" with
-// respect to that subscription - the subscriber is guaranteed to receive any
-// message published after this subscription point that matches the
-// subscription's query. Note that messages published before the Subscription
-// point may or may not be delivered.
-//
-// If the system truncates the subscription according to the specified
-// truncation policy, the system delivers a subscription status event with the
-// "truncated" field set to true. We refer to such events as "truncation
-// events". A truncation event:
-//
-// * Informs the subscriber that part of the subscription messages have been
-// discarded. The subscriber may want to recover from the message loss, e.g.,
-// by resyncing its state with its backend.
-// * Establishes a new subscription point, i.e., the subscriber is guaranteed to
-// receive all changes published after the trunction event is received (or
-// until another truncation event is received).
-//
-// Note that messages are not delivered in any particular order by the pubsub
-// system. Furthermore, the system guarantees at-least-once delivery
-// of each message or truncation events until acked.
-
-// -----------------------------------------------------------------------------
-// Deletion
-// -----------------------------------------------------------------------------
-
-// Both topics and subscriptions may be deleted. Deletion of a topic implies
-// deletion of all attached subscriptions.
-//
-// When a subscription is deleted directly by calling DeleteSubscription, all
-// messages are immediately dropped. If it is a pull subscriber, future pull
-// requests will return NOT_FOUND.
-//
-// When a topic is deleted all corresponding subscriptions are immediately
-// deleted, and subscribers experience the same behavior as directly deleting
-// the subscription.
-
-// -----------------------------------------------------------------------------
-// The Publisher service and its protos.
-// -----------------------------------------------------------------------------
-
-// The service that an application uses to manipulate topics, and to send
-// messages to a topic.
-service PublisherService {
-
- // Creates the given topic with the given name.
- rpc CreateTopic(Topic) returns (Topic) {
- }
-
- // Adds a message to the topic. Returns NOT_FOUND if the topic does not
- // exist.
- // (-- For different error code values returned via Stubby, see
- // util/task/codes.proto. --)
- rpc Publish(PublishRequest) returns (proto2.Empty) {
- }
-
- // Adds one or more messages to the topic. Returns NOT_FOUND if the topic does
- // not exist.
- rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse) {
- }
-
- // Gets the configuration of a topic. Since the topic only has the name
- // attribute, this method is only useful to check the existence of a topic.
- // If other attributes are added in the future, they will be returned here.
- rpc GetTopic(GetTopicRequest) returns (Topic) {
- }
-
- // Lists matching topics.
- rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse) {
- }
-
- // Deletes the topic with the given name. All subscriptions to this topic
- // are also deleted. Returns NOT_FOUND if the topic does not exist.
- // After a topic is deleted, a new topic may be created with the same name.
- rpc DeleteTopic(DeleteTopicRequest) returns (proto2.Empty) {
- }
-}
-
-// A topic resource.
-message Topic {
- // Name of the topic.
- optional string name = 1;
-}
-
-// A message data and its labels.
-message PubsubMessage {
- // The message payload.
- optional bytes data = 1;
-
- // Optional list of labels for this message. Keys in this collection must
- // be unique.
- //(-- TODO(eschapira): Define how key namespace may be scoped to the topic.--)
- repeated tech.label.Label label = 2;
-
- // ID of this message assigned by the server at publication time. Guaranteed
- // to be unique within the topic. This value may be read by a subscriber
- // that receives a PubsubMessage via a Pull call or a push delivery. It must
- // not be populated by a publisher in a Publish call.
- optional string message_id = 3;
-}
-
-// Request for the GetTopic method.
-message GetTopicRequest {
- // The name of the topic to get.
- optional string topic = 1;
-}
-
-// Request for the Publish method.
-message PublishRequest {
- // The message in the request will be published on this topic.
- optional string topic = 1;
-
- // The message to publish.
- optional PubsubMessage message = 2;
-}
-
-// Request for the PublishBatch method.
-message PublishBatchRequest {
- // The messages in the request will be published on this topic.
- optional string topic = 1;
-
- // The messages to publish.
- repeated PubsubMessage messages = 2;
-}
-
-// Response for the PublishBatch method.
-message PublishBatchResponse {
- // The server-assigned ID of each published message, in the same order as
- // the messages in the request. IDs are guaranteed to be unique within
- // the topic.
- repeated string message_ids = 1;
-}
-
-// Request for the ListTopics method.
-message ListTopicsRequest {
- // A valid label query expression.
- //
- optional string query = 1;
-
- // Maximum number of topics to return.
- // (-- If not specified or <= 0, the implementation will select a reasonable
- // value. --)
- optional int32 max_results = 2;
-
- // The value obtained in the last <code>ListTopicsResponse</code>
- // for continuation.
- optional string page_token = 3;
-
-}
-
-// Response for the ListTopics method.
-message ListTopicsResponse {
- // The resulting topics.
- repeated Topic topic = 1;
-
- // If not empty, indicates that there are more topics that match the request,
- // and this value should be passed to the next <code>ListTopicsRequest</code>
- // to continue.
- optional string next_page_token = 2;
-}
-
-// Request for the Delete method.
-message DeleteTopicRequest {
- // Name of the topic to delete.
- optional string topic = 1;
-}
-
-// -----------------------------------------------------------------------------
-// The Subscriber service and its protos.
-// -----------------------------------------------------------------------------
-
-// The service that an application uses to manipulate subscriptions and to
-// consume messages from a subscription via the pull method.
-service SubscriberService {
-
- // Creates a subscription on a given topic for a given subscriber.
- // If the subscription already exists, returns ALREADY_EXISTS.
- // If the corresponding topic doesn't exist, returns NOT_FOUND.
- //
- // If the name is not provided in the request, the server will assign a random
- // name for this subscription on the same project as the topic.
- rpc CreateSubscription(Subscription) returns (Subscription) {
- }
-
- // Gets the configuration details of a subscription.
- rpc GetSubscription(GetSubscriptionRequest) returns (Subscription) {
- }
-
- // Lists matching subscriptions.
- rpc ListSubscriptions(ListSubscriptionsRequest)
- returns (ListSubscriptionsResponse) {
- }
-
- // Deletes an existing subscription. All pending messages in the subscription
- // are immediately dropped. Calls to Pull after deletion will return
- // NOT_FOUND.
- rpc DeleteSubscription(DeleteSubscriptionRequest) returns (proto2.Empty) {
- }
-
- // Removes all the pending messages in the subscription and releases the
- // storage associated with them. Results in a truncation event to be sent to
- // the subscriber. Messages added after this call returns are stored in the
- // subscription as before.
- rpc TruncateSubscription(TruncateSubscriptionRequest) returns (proto2.Empty) {
- }
-
- //
- // Push subscriber calls.
- //
-
- // Modifies the <code>PushConfig</code> for a specified subscription.
- // This method can be used to suspend the flow of messages to an endpoint
- // by clearing the <code>PushConfig</code> field in the request. Messages
- // will be accumulated for delivery even if no push configuration is
- // defined or while the configuration is modified.
- rpc ModifyPushConfig(ModifyPushConfigRequest) returns (proto2.Empty) {
- }
-
- //
- // Pull Subscriber calls
- //
-
- // Pulls a single message from the server.
- // If return_immediately is true, and no messages are available in the
- // subscription, this method returns FAILED_PRECONDITION. The system is free
- // to return an UNAVAILABLE error if no messages are available in a
- // reasonable amount of time (to reduce system load).
- rpc Pull(PullRequest) returns (PullResponse) {
- }
-
- // Pulls messages from the server. Returns an empty list if there are no
- // messages available in the backlog. The system is free to return UNAVAILABLE
- // if there are too many pull requests outstanding for the given subscription.
- rpc PullBatch(PullBatchRequest) returns (PullBatchResponse) {
- }
-
- // Modifies the Ack deadline for a message received from a pull request.
- rpc ModifyAckDeadline(ModifyAckDeadlineRequest) returns (proto2.Empty) {
- }
-
- // Acknowledges a particular received message: the Pub/Sub system can remove
- // the given message from the subscription. Acknowledging a message whose
- // Ack deadline has expired may succeed, but the message could have been
- // already redelivered. Acknowledging a message more than once will not
- // result in an error. This is only used for messages received via pull.
- rpc Acknowledge(AcknowledgeRequest) returns (proto2.Empty) {
- }
-
- // Refuses processing a particular received message. The system will
- // redeliver this message to some consumer of the subscription at some
- // future time. This is only used for messages received via pull.
- rpc Nack(NackRequest) returns (proto2.Empty) {
- }
-}
-
-// A subscription resource.
-message Subscription {
- // Name of the subscription.
- optional string name = 1;
-
- // The name of the topic from which this subscription is receiving messages.
- optional string topic = 2;
-
- // If <code>query</code> is non-empty, only messages on the subscriber's
- // topic whose labels match the query will be returned. Otherwise all
- // messages on the topic will be returned.
- //
- optional string query = 3;
-
- // The subscriber may specify requirements for truncating unacknowledged
- // subscription entries. The system will honor the
- // <code>CreateSubscription</code> request only if it can meet these
- // requirements. If this field is not specified, messages are never truncated
- // by the system.
- optional TruncationPolicy truncation_policy = 4;
-
- // Specifies which messages can be truncated by the system.
- message TruncationPolicy {
- oneof policy {
- // If <code>max_bytes</code> is specified, the system is allowed to drop
- // old messages to keep the combined size of stored messages under
- // <code>max_bytes</code>. This is a hint; the system may keep more than
- // this many bytes, but will make a best effort to keep the size from
- // growing much beyond this parameter.
- int64 max_bytes = 1;
-
- // If <code>max_age_seconds</code> is specified, the system is allowed to
- // drop messages that have been stored for at least this many seconds.
- // This is a hint; the system may keep these messages, but will make a
- // best effort to remove them when their maximum age is reached.
- int64 max_age_seconds = 2;
- }
- }
-
- // If push delivery is used with this subscription, this field is
- // used to configure it.
- optional PushConfig push_config = 5;
-
- // For either push or pull delivery, the value is the maximum time after a
- // subscriber receives a message before the subscriber should acknowledge or
- // Nack the message. If the Ack deadline for a message passes without an
- // Ack or a Nack, the Pub/Sub system will eventually redeliver the message.
- // If a subscriber acknowledges after the deadline, the Pub/Sub system may
- // accept the Ack, but it is possible that the message has been already
- // delivered again. Multiple Acks to the message are allowed and will
- // succeed.
- //
- // For push delivery, this value is used to set the request timeout for
- // the call to the push endpoint.
- //
- // For pull delivery, this value is used as the initial value for the Ack
- // deadline. It may be overridden for a specific pull request (message) with
- // <code>ModifyAckDeadline</code>.
- // While a message is outstanding (i.e. it has been delivered to a pull
- // subscriber and the subscriber has not yet Acked or Nacked), the Pub/Sub
- // system will not deliver that message to another pull subscriber
- // (on a best-effort basis).
- optional int32 ack_deadline_seconds = 6;
-
- // If this parameter is set to n, the system is allowed to (but not required
- // to) delete the subscription when at least n seconds have elapsed since the
- // client presence was detected. (Presence is detected through any
- // interaction using the subscription ID, including Pull(), Get(), or
- // acknowledging a message.)
- //
- // If this parameter is not set, the subscription will stay live until
- // explicitly deleted.
- //
- // Clients can detect such garbage collection when a Get call or a Pull call
- // (for pull subscribers only) returns NOT_FOUND.
- optional int64 garbage_collect_seconds = 7;
-}
-
-// Configuration for a push delivery endpoint.
-message PushConfig {
- // A URL locating the endpoint to which messages should be pushed.
- // For example, a Webhook endpoint might use "https://example.com/push".
- // (-- An Android application might use "gcm:<REGID>", where <REGID> is a
- // GCM registration id allocated for pushing messages to the application. --)
- optional string push_endpoint = 1;
-}
-
-// An event indicating a received message or truncation event.
-message PubsubEvent {
- // The subscription that received the event.
- optional string subscription = 1;
-
- oneof type {
- // A received message.
- PubsubMessage message = 2;
-
- // Indicates that this subscription has been truncated.
- bool truncated = 3;
-
- // Indicates that this subscription has been deleted. (Note that pull
- // subscribers will always receive NOT_FOUND in response in their pull
- // request on the subscription, rather than seeing this boolean.)
- bool deleted = 4;
- }
-}
-
-// Request for the GetSubscription method.
-message GetSubscriptionRequest {
- // The name of the subscription to get.
- optional string subscription = 1;
-}
-
-// Request for the ListSubscriptions method.
-message ListSubscriptionsRequest {
- // A valid label query expression.
- // (-- Which labels are required or supported is implementation-specific.
- // TODO(eschapira): This method must support to query by topic. We must
- // define the key URI for the "topic" label. --)
- optional string query = 1;
-
- // Maximum number of subscriptions to return.
- // (-- If not specified or <= 0, the implementation will select a reasonable
- // value. --)
- optional int32 max_results = 3;
-
- // The value obtained in the last <code>ListSubscriptionsResponse</code>
- // for continuation.
- optional string page_token = 4;
-}
-
-// Response for the ListSubscriptions method.
-message ListSubscriptionsResponse {
- // The subscriptions that match the request.
- repeated Subscription subscription = 1;
-
- // If not empty, indicates that there are more subscriptions that match the
- // request and this value should be passed to the next
- // <code>ListSubscriptionsRequest</code> to continue.
- optional string next_page_token = 2;
-}
-
-// Request for the TruncateSubscription method.
-message TruncateSubscriptionRequest {
- // The subscription that is being truncated.
- optional string subscription = 1;
-}
-
-// Request for the DeleteSubscription method.
-message DeleteSubscriptionRequest {
- // The subscription to delete.
- optional string subscription = 1;
-}
-
-// Request for the ModifyPushConfig method.
-message ModifyPushConfigRequest {
- // The name of the subscription.
- optional string subscription = 1;
-
- // An empty <code>push_config</code> indicates that the Pub/Sub system should
- // pause pushing messages from the given subscription.
- optional PushConfig push_config = 2;
-}
-
-// -----------------------------------------------------------------------------
-// The protos used by a pull subscriber.
-// -----------------------------------------------------------------------------
-
-// Request for the Pull method.
-message PullRequest {
- // The subscription from which a message should be pulled.
- optional string subscription = 1;
-
- // If this is specified as true the system will respond immediately even if
- // it is not able to return a message in the Pull response. Otherwise the
- // system is allowed to wait until at least one message is available rather
- // than returning FAILED_PRECONDITION. The client may cancel the request if
- // it does not wish to wait any longer for the response.
- optional bool return_immediately = 2;
-}
-
-// Either a <code>PubsubMessage</code> or a truncation event. One of these two
-// must be populated.
-message PullResponse {
- // This ID must be used to acknowledge the received event or message.
- optional string ack_id = 1;
-
- // A pubsub message or truncation event.
- optional PubsubEvent pubsub_event = 2;
-}
-
-// Request for the PullBatch method.
-message PullBatchRequest {
- // The subscription from which messages should be pulled.
- optional string subscription = 1;
-
- // If this is specified as true the system will respond immediately even if
- // it is not able to return a message in the Pull response. Otherwise the
- // system is allowed to wait until at least one message is available rather
- // than returning no messages. The client may cancel the request if it does
- // not wish to wait any longer for the response.
- optional bool return_immediately = 2;
-
- // The maximum number of PubsubEvents returned for this request. The Pub/Sub
- // system may return fewer than the number of events specified.
- optional int32 max_events = 3;
-}
-
-// Response for the PullBatch method.
-message PullBatchResponse {
-
- // Received Pub/Sub messages or status events. The Pub/Sub system will return
- // zero messages if there are no more messages available in the backlog. The
- // Pub/Sub system may return fewer than the max_events requested even if
- // there are more messages available in the backlog.
- repeated PullResponse pull_responses = 2;
-}
-
-// Request for the ModifyAckDeadline method.
-message ModifyAckDeadlineRequest {
- // The name of the subscription from which messages are being pulled.
- optional string subscription = 1;
-
- // The acknowledgment ID.
- optional string ack_id = 2;
-
- // The new Ack deadline. Must be >= 0.
- optional int32 ack_deadline_seconds = 3;
-}
-
-// Request for the Acknowledge method.
-message AcknowledgeRequest {
- // The subscription whose message is being acknowledged.
- optional string subscription = 1;
-
- // The acknowledgment ID for the message being acknowledged. This was
- // returned by the Pub/Sub system in the Pull response.
- repeated string ack_id = 2;
-}
-
-// Request for the Nack method.
-message NackRequest {
- // The subscription whose message is being Nacked.
- optional string subscription = 1;
-
- // The acknowledgment ID for the message being refused. This was returned by
- // the Pub/Sub system in the Pull response.
- repeated string ack_id = 2;
-}
-
-// -----------------------------------------------------------------------------
-// The service and protos used by a push subscriber.
-// -----------------------------------------------------------------------------
-
-// The service that a subscriber uses to handle messages sent via push
-// delivery.
-// This service is not currently exported for HTTP clients.
-// TODO(eschapira): Explain HTTP subscribers.
-service PushEndpointService {
- // Sends a <code>PubsubMessage</code> or a subscription status event to a
- // push endpoint.
- // The push endpoint responds with an empty message and a code from
- // util/task/codes.proto. The following codes have a particular meaning to the
- // Pub/Sub system:
- // OK - This is interpreted by Pub/Sub as Ack.
- // ABORTED - This is intepreted by Pub/Sub as a Nack, without implying
- // pushback for congestion control. The Pub/Sub system will
- // retry this message at a later time.
- // UNAVAILABLE - This is intepreted by Pub/Sub as a Nack, with the additional
- // semantics of push-back. The Pub/Sub system will use an AIMD
- // congestion control algorithm to backoff the rate of sending
- // messages from this subscription.
- // Any other code, or a failure to respond, will be interpreted in the same
- // way as ABORTED; i.e. the system will retry the message at a later time to
- // ensure reliable delivery.
- rpc HandlePubsubEvent(PubsubEvent) returns (proto2.Empty);
-}
diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js
deleted file mode 100644
index 26301515f0..0000000000
--- a/src/node/examples/pubsub/pubsub_demo.js
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-'use strict';
-
-var async = require('async');
-var fs = require('fs');
-var GoogleAuth = require('google-auth-library');
-var parseArgs = require('minimist');
-var strftime = require('strftime');
-var _ = require('underscore');
-var grpc = require('../..');
-var PROTO_PATH = __dirname + '/pubsub.proto';
-var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
-
-function PubsubRunner(pub, sub, args) {
- this.pub = pub;
- this.sub = sub;
- this.args = args;
-}
-
-PubsubRunner.prototype.getTestTopicName = function() {
- var base_name = '/topics/' + this.args.project_id + '/';
- if (this.args.topic_name) {
- return base_name + this.args.topic_name;
- }
- var now_text = strftime('%Y%m%d%H%M%S%L');
- return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.getTestSubName = function() {
- var base_name = '/subscriptions/' + this.args.project_id + '/';
- if (this.args.sub_name) {
- return base_name + this.args.sub_name;
- }
- var now_text = strftime('%Y%m%d%H%M%S%L');
- return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.listProjectTopics = function(callback) {
- var q = ('cloud.googleapis.com/project in (/projects/' +
- this.args.project_id + ')');
- this.pub.listTopics({query: q}, callback);
-};
-
-PubsubRunner.prototype.topicExists = function(name, callback) {
- this.listProjectTopics(function(err, response) {
- if (err) {
- callback(err);
- } else {
- callback(null, _.some(response.topic, function(t) {
- return t.name === name;
- }));
- }
- });
-};
-
-PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
- var self = this;
- this.topicExists(name, function(err, exists) {
- if (err) {
- callback(err);
- } else{
- if (exists) {
- callback(null);
- } else {
- self.pub.createTopic({name: name}, callback);
- }
- }
- });
-};
-
-PubsubRunner.prototype.removeTopic = function(callback) {
- var name = this.getTestTopicName();
- console.log('... removing Topic', name);
- this.pub.deleteTopic({topic: name}, function(err, value) {
- if (err) {
- console.log('Could not delete a topic: rpc failed with', err);
- callback(err);
- } else {
- console.log('removed Topic', name, 'OK');
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.createTopic = function(callback) {
- var name = this.getTestTopicName();
- console.log('... creating Topic', name);
- this.pub.createTopic({name: name}, function(err, value) {
- if (err) {
- console.log('Could not create a topic: rpc failed with', err);
- callback(err);
- } else {
- console.log('created Topic', name, 'OK');
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.listSomeTopics = function(callback) {
- console.log('Listing topics');
- console.log('-------------_');
- this.listProjectTopics(function(err, response) {
- if (err) {
- console.log('Could not list topic: rpc failed with', err);
- callback(err);
- } else {
- _.each(response.topic, function(t) {
- console.log(t.name);
- });
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.checkExists = function(callback) {
- var name = this.getTestTopicName();
- console.log('... checking for topic', name);
- this.topicExists(name, function(err, exists) {
- if (err) {
- console.log('Could not check for a topics: rpc failed with', err);
- callback(err);
- } else {
- if (exists) {
- console.log(name, 'is a topic');
- } else {
- console.log(name, 'is not a topic');
- }
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.randomPubSub = function(callback) {
- var self = this;
- var topic_name = this.getTestTopicName();
- var sub_name = this.getTestSubName();
- var subscription = {name: sub_name, topic: topic_name};
- async.waterfall([
- _.bind(this.createTopicIfNeeded, this, topic_name),
- _.bind(this.sub.createSubscription, this.sub, subscription),
- function(resp, cb) {
- var msg_count = _.random(10, 30);
- // Set up msg_count messages to publish
- var message_senders = _.times(msg_count, function(n) {
- return _.bind(self.pub.publish, self.pub, {
- topic: topic_name,
- message: {data: new Buffer('message ' + n)}
- });
- });
- async.parallel(message_senders, function(err, result) {
- cb(err, result, msg_count);
- });
- },
- function(result, msg_count, cb) {
- console.log('Sent', msg_count, 'messages to', topic_name + ',',
- 'checking for them now.');
- var batch_request = {
- subscription: sub_name,
- max_events: msg_count
- };
- self.sub.pullBatch(batch_request, cb);
- },
- function(batch, cb) {
- var ack_id = _.pluck(batch.pull_responses, 'ack_id');
- console.log('Got', ack_id.length, 'messages, acknowledging them...');
- var ack_request = {
- subscription: sub_name,
- ack_id: ack_id
- };
- self.sub.acknowledge(ack_request, cb);
- },
- function(result, cb) {
- console.log(
- 'Test messages were acknowledged OK, deleting the subscription');
- self.sub.deleteSubscription({subscription: sub_name}, cb);
- }
- ], function (err, result) {
- if (err) {
- console.log('Could not do random pub sub: rpc failed with', err);
- }
- callback(err, result);
- });
-};
-
-function main(callback) {
- var argv = parseArgs(process.argv, {
- string: [
- 'host',
- 'oauth_scope',
- 'port',
- 'action',
- 'project_id',
- 'topic_name',
- 'sub_name'
- ],
- default: {
- host: 'pubsub-staging.googleapis.com',
- oauth_scope: 'https://www.googleapis.com/auth/pubsub',
- port: 443,
- action: 'listSomeTopics',
- project_id: 'stoked-keyword-656'
- }
- });
- var valid_actions = [
- 'createTopic',
- 'removeTopic',
- 'listSomeTopics',
- 'checkExists',
- 'randomPubSub'
- ];
- if (_.some(valid_actions, function(action) {
- return action === argv.action;
- })) {
- callback(new Error('Action was not valid'));
- }
- var address = argv.host + ':' + argv.port;
- (new GoogleAuth()).getApplicationDefault(function(err, credential) {
- if (err) {
- callback(err);
- return;
- }
- if (credential.createScopedRequired()) {
- credential = credential.createScoped(argv.oauth_scope);
- }
- var updateMetadata = grpc.getGoogleAuthDelegate(credential);
- var ca_path = process.env.SSL_CERT_FILE;
- fs.readFile(ca_path, function(err, ca_data) {
- if (err) {
- callback(err);
- return;
- }
- var ssl_creds = grpc.Credentials.createSsl(ca_data);
- var options = {
- credentials: ssl_creds,
- 'grpc.ssl_target_name_override': argv.host
- };
- var pub = new pubsub.PublisherService(address, options, updateMetadata);
- var sub = new pubsub.SubscriberService(address, options, updateMetadata);
- var runner = new PubsubRunner(pub, sub, argv);
- runner[argv.action](callback);
- });
- });
-}
-
-if (require.main === module) {
- main(function(err) {
- if (err) {
- throw err;
- }
- });
-}
-
-module.exports = PubsubRunner;
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index d9fe0a5835..b4d66c517c 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,5 +1,5 @@
# This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-04-14 09:35:44 -0700 using RuboCop version 0.29.1.
+# on 2015-04-15 18:43:23 -0700 using RuboCop version 0.30.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
@@ -9,7 +9,7 @@
Metrics/AbcSize:
Max: 36
-# Offense count: 2
+# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
Max: 183
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index af7a1d5b15..6f1fe2614f 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -136,12 +136,14 @@ class PingPongPlayer
include Grpc::Testing::PayloadType
attr_accessor :assertions # required by Minitest::Assertions
attr_accessor :queue
+ attr_accessor :canceller_op
# reqs is the enumerator over the requests
def initialize(msg_sizes)
@queue = Queue.new
@msg_sizes = msg_sizes
@assertions = 0 # required by Minitest::Assertions
+ @canceller_op = nil # used to cancel after the first response
end
def each_item
@@ -155,12 +157,15 @@ class PingPongPlayer
response_parameters: [p_cls.new(size: resp_size)])
yield req
resp = @queue.pop
- assert_equal(:COMPRESSABLE, resp.payload.type,
- 'payload type is wrong')
+ assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong')
assert_equal(resp_size, resp.payload.body.length,
- 'payload body #{i} has the wrong length')
+ "payload body #{count} has the wrong length")
p "OK: ping_pong #{count}"
count += 1
+ unless @canceller_op.nil?
+ canceller_op.cancel
+ break
+ end
end
end
end
@@ -260,6 +265,27 @@ class NamedTests
p 'OK: ping_pong'
end
+ def cancel_after_begin
+ msg_sizes = [27_182, 8, 1828, 45_904]
+ reqs = msg_sizes.map do |x|
+ req = Payload.new(body: nulls(x))
+ StreamingInputCallRequest.new(payload: req)
+ end
+ op = @stub.streaming_input_call(reqs, return_op: true)
+ op.cancel
+ assert_raises(GRPC::Cancelled) { op.execute }
+ p 'OK: cancel_after_begin'
+ end
+
+ def cancel_after_first
+ msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
+ ppp = PingPongPlayer.new(msg_sizes)
+ op = @stub.full_duplex_call(ppp.each_item, return_op: true)
+ ppp.canceller_op = op # causes ppp to cancel after the 1st message
+ assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+ p 'OK: cancel_after_first'
+ end
+
def all
all_methods = NamedTests.instance_methods(false).map(&:to_s)
all_methods.each do |m|
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 21b933f97f..6da7d3c830 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -42,6 +42,17 @@
#include "rb_completion_queue.h"
#include "rb_grpc.h"
+/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
+static VALUE grpc_rb_cCall;
+
+/* grpc_rb_eCallError is the ruby class of the exception thrown during call
+ operations; */
+VALUE grpc_rb_eCallError = Qnil;
+
+/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
+ a timeout. */
+static VALUE grpc_rb_eOutOfTime = Qnil;
+
/* grpc_rb_sBatchResult is struct class used to hold the results of a batch
* call. */
static VALUE grpc_rb_sBatchResult;
@@ -219,7 +230,7 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
it's capacity should have been computed via a prior call to
grpc_rb_md_ary_fill_hash_cb
*/
-int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
+static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
grpc_metadata_array *md_ary = NULL;
int array_length;
int i;
@@ -259,7 +270,8 @@ int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
/* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
to pre-compute the capacity a grpc_metadata_array.
*/
-int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
+static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
+ VALUE md_ary_obj) {
grpc_metadata_array *md_ary = NULL;
/* Construct a metadata object from key and value and add it */
@@ -278,7 +290,7 @@ int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
-void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
+static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
VALUE md_ary_obj = Qnil;
if (md_ary_hash == Qnil) {
return; /* Do nothing if the expected has value is nil */
@@ -334,7 +346,8 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
/* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
each key of an ops hash is valid.
*/
-int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) {
+static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
+ VALUE ops_ary) {
/* Update the capacity; the value is an array, add capacity for each value in
* the array */
if (TYPE(key) != T_FIXNUM) {
@@ -363,7 +376,7 @@ int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) {
/* grpc_rb_op_update_status_from_server adds the values in a ruby status
struct to the 'send_status_from_server' portion of an op.
*/
-void grpc_rb_op_update_status_from_server(grpc_op *op,
+static void grpc_rb_op_update_status_from_server(grpc_op *op,
grpc_metadata_array* md_ary,
VALUE status) {
VALUE code = rb_struct_aref(status, sym_code);
@@ -615,18 +628,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
-/* grpc_rb_cCall is the ruby class that proxies grpc_call. */
-VALUE grpc_rb_cCall = Qnil;
-
-/* grpc_rb_eCallError is the ruby class of the exception thrown during call
- operations; */
-VALUE grpc_rb_eCallError = Qnil;
-
-/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
- a timeout. */
-VALUE grpc_rb_eOutOfTime = Qnil;
-
-void Init_grpc_error_codes() {
+static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
@@ -678,7 +680,7 @@ void Init_grpc_error_codes() {
rb_obj_freeze(rb_error_code_details);
}
-void Init_grpc_op_codes() {
+static void Init_grpc_op_codes() {
/* Constants representing operation type codes in grpc.h */
VALUE grpc_rb_mCallOps =
rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index e20a34c74e..003ce0429e 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -49,17 +49,10 @@ const char* grpc_call_error_detail_of(grpc_call_error err);
/* Converts a metadata array to a hash. */
VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
-/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
-extern VALUE grpc_rb_cCall;
-
/* grpc_rb_eCallError is the ruby class of the exception thrown during call
operations. */
extern VALUE grpc_rb_eCallError;
-/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
- a timeout. */
-extern VALUE grpc_rb_eOutOfTime;
-
/* Initializes the Call class. */
void Init_grpc_call();
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 47b85c83ed..214675af92 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -58,6 +58,8 @@ static ID id_target;
* GCed before the channel */
static ID id_cqueue;
+/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
+static VALUE grpc_rb_cChannel = Qnil;
/* Used during the conversion of a hash to channel args during channel setup */
static VALUE grpc_rb_cChannelArgs;
@@ -246,9 +248,6 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil;
}
-/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
-VALUE grpc_rb_cChannel = Qnil;
-
void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel =
diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h
index 5c57b31fb2..6e3c087689 100644
--- a/src/ruby/ext/grpc/rb_channel.h
+++ b/src/ruby/ext/grpc/rb_channel.h
@@ -37,9 +37,6 @@
#include <ruby.h>
#include <grpc/grpc.h>
-/* grpc_rb_cChannel is the Channel class whose instances proxy grpc_channel. */
-extern VALUE grpc_rb_cChannel;
-
/* Initializes the Channel class. */
void Init_grpc_channel();
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index db2cc47761..e2c9b85661 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -39,6 +39,10 @@
#include <grpc/support/time.h>
#include "rb_grpc.h"
+/* grpc_rb_cCompletionQueue is the ruby class that proxies
+ * grpc_completion_queue. */
+static VALUE grpc_rb_cCompletionQueue = Qnil;
+
/* Used to allow grpc_completion_queue_next call to release the GIL */
typedef struct next_call_stack {
grpc_completion_queue *cq;
@@ -177,10 +181,6 @@ grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
return next_call.event;
}
-/* grpc_rb_cCompletionQueue is the ruby class that proxies
- * grpc_completion_queue. */
-VALUE grpc_rb_cCompletionQueue = Qnil;
-
void Init_grpc_completion_queue() {
grpc_rb_cCompletionQueue =
rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject);
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 1bfb80e499..4d0f49ac47 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -48,10 +48,6 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
VALUE timeout);
-/* grpc_rb_cCompletionQueue is the CompletionQueue class whose instances proxy
- grpc_completion_queue. */
-extern VALUE grpc_rb_cCompletionQueue;
-
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();
diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c
index 8170f0d26c..1ec88914e4 100644
--- a/src/ruby/ext/grpc/rb_credentials.c
+++ b/src/ruby/ext/grpc/rb_credentials.c
@@ -40,6 +40,9 @@
#include "rb_grpc.h"
+/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */
+static VALUE grpc_rb_cCredentials = Qnil;
+
/* grpc_rb_credentials wraps a grpc_credentials. It provides a
* peer ruby object, 'mark' to minimize copying when a credential is
* created from ruby. */
@@ -252,9 +255,6 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) {
return self;
}
-/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */
-VALUE grpc_rb_cCredentials = Qnil;
-
void Init_grpc_credentials() {
grpc_rb_cCredentials =
rb_define_class_under(grpc_rb_mGrpcCore, "Credentials", rb_cObject);
diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h
index dc0a3d01e8..e7c43c9c78 100644
--- a/src/ruby/ext/grpc/rb_credentials.h
+++ b/src/ruby/ext/grpc/rb_credentials.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc_security.h>
-/* grpc_rb_cCredentials is the ruby class whose instances proxy
- grpc_credentials. */
-extern VALUE grpc_rb_cCredentials;
-
/* Initializes the ruby Credentials class. */
void Init_grpc_credentials();
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index b431636c75..70a28ddc17 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -46,7 +46,7 @@
#include "rb_credentials.h"
#include "rb_server_credentials.h"
-VALUE grpc_rb_cTimeVal = Qnil;
+static VALUE grpc_rb_cTimeVal = Qnil;
static rb_data_type_t grpc_rb_timespec_data_type = {
"gpr_timespec",
@@ -154,7 +154,7 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) {
return t;
}
-void Init_grpc_status_codes() {
+static void Init_grpc_status_codes() {
/* Constants representing the status codes or grpc_status_code in status.h */
VALUE grpc_rb_mStatusCodes =
rb_define_module_under(grpc_rb_mGrpcCore, "StatusCodes");
@@ -203,7 +203,7 @@ static ID id_inspect;
static ID id_to_s;
/* Converts a wrapped time constant to a standard time. */
-VALUE grpc_rb_time_val_to_time(VALUE self) {
+static VALUE grpc_rb_time_val_to_time(VALUE self) {
gpr_timespec *time_const = NULL;
TypedData_Get_Struct(self, gpr_timespec, &grpc_rb_timespec_data_type,
time_const);
@@ -212,17 +212,17 @@ VALUE grpc_rb_time_val_to_time(VALUE self) {
}
/* Invokes inspect on the ctime version of the time val. */
-VALUE grpc_rb_time_val_inspect(VALUE self) {
+static VALUE grpc_rb_time_val_inspect(VALUE self) {
return rb_funcall(grpc_rb_time_val_to_time(self), id_inspect, 0);
}
/* Invokes to_s on the ctime version of the time val. */
-VALUE grpc_rb_time_val_to_s(VALUE self) {
+static VALUE grpc_rb_time_val_to_s(VALUE self) {
return rb_funcall(grpc_rb_time_val_to_time(self), id_to_s, 0);
}
/* Adds a module with constants that map to gpr's static timeval structs. */
-void Init_grpc_time_consts() {
+static void Init_grpc_time_consts() {
VALUE grpc_rb_mTimeConsts =
rb_define_module_under(grpc_rb_mGrpcCore, "TimeConsts");
grpc_rb_cTimeVal =
@@ -249,7 +249,7 @@ void Init_grpc_time_consts() {
id_tv_nsec = rb_intern("tv_nsec");
}
-void grpc_rb_shutdown(void *vm) { grpc_shutdown(); }
+static void grpc_rb_shutdown(void *vm) { grpc_shutdown(); }
/* Initialize the GRPC module structs */
@@ -262,6 +262,11 @@ VALUE grpc_rb_sStatus = Qnil;
VALUE grpc_rb_mGRPC = Qnil;
VALUE grpc_rb_mGrpcCore = Qnil;
+/* cached Symbols for members in Status struct */
+VALUE sym_code = Qundef;
+VALUE sym_details = Qundef;
+VALUE sym_metadata = Qundef;
+
void Init_grpc() {
grpc_init();
ruby_vm_at_exit(grpc_rb_shutdown);
diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h
index 57db350f5e..a502273de1 100644
--- a/src/ruby/ext/grpc/rb_grpc.h
+++ b/src/ruby/ext/grpc/rb_grpc.h
@@ -41,9 +41,6 @@
/* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */
extern VALUE grpc_rb_mGrpcCore;
-/* Class used to wrap timeval structs. */
-extern VALUE grpc_rb_cTimeVal;
-
/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */
extern VALUE grpc_rb_sNewServerRpc;
@@ -51,13 +48,13 @@ extern VALUE grpc_rb_sNewServerRpc;
extern VALUE grpc_rb_sStatus;
/* sym_code is the symbol for the code attribute of grpc_rb_sStatus. */
-VALUE sym_code;
+extern VALUE sym_code;
/* sym_details is the symbol for the details attribute of grpc_rb_sStatus. */
-VALUE sym_details;
+extern VALUE sym_details;
/* sym_metadata is the symbol for the metadata attribute of grpc_rb_sStatus. */
-VALUE sym_metadata;
+extern VALUE sym_metadata;
/* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the
wrapped struct does not need to participate in ruby gc. */
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 5d7c8f7a5d..bc0878af05 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -44,7 +44,7 @@
#include "rb_grpc.h"
/* grpc_rb_cServer is the ruby class that proxies grpc_server. */
-VALUE grpc_rb_cServer = Qnil;
+static VALUE grpc_rb_cServer = Qnil;
/* id_at is the constructor method of the ruby standard Time class. */
static ID id_at;
diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h
index 22e88a7d46..5e4b711d35 100644
--- a/src/ruby/ext/grpc/rb_server.h
+++ b/src/ruby/ext/grpc/rb_server.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc.h>
-/* grpc_rb_cServer is the Server class whose instances proxy
- grpc_byte_buffer. */
-extern VALUE grpc_rb_cServer;
-
/* Initializes the Server class. */
void Init_grpc_server();
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index ab16c05650..a86389445f 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -40,6 +40,10 @@
#include "rb_grpc.h"
+/* grpc_rb_cServerCredentials is the ruby class that proxies
+ grpc_server_credentials. */
+static VALUE grpc_rb_cServerCredentials = Qnil;
+
/* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a
peer ruby object, 'mark' to minimize copying when a server credential is
created from ruby. */
@@ -191,10 +195,6 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs,
return self;
}
-/* grpc_rb_cServerCredentials is the ruby class that proxies
- grpc_server_credentials. */
-VALUE grpc_rb_cServerCredentials = Qnil;
-
void Init_grpc_server_credentials() {
grpc_rb_cServerCredentials =
rb_define_class_under(grpc_rb_mGrpcCore, "ServerCredentials", rb_cObject);
diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h
index f79a869358..35b395ad5c 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.h
+++ b/src/ruby/ext/grpc/rb_server_credentials.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc_security.h>
-/* grpc_rb_cServerCredentials is the ruby class whose instances proxy
- grpc_server_credentials. */
-extern VALUE grpc_rb_cServerCredentials;
-
/* Initializes the ruby ServerCredentials class. */
void Init_grpc_server_credentials();
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index a50d0351da..12d4ab17f2 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -22,16 +22,16 @@ Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
- s.add_dependency 'googleauth', '~> 0.4'
+ s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests
s.add_dependency 'logging', '~> 1.8'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
s.add_dependency 'xray', '~> 1.1'
- s.add_development_dependency 'bundler', '~> 1.7'
- s.add_development_dependency 'rake', '~> 10.0'
- s.add_development_dependency 'rake-compiler', '~> 0'
- s.add_development_dependency 'rubocop', '~> 0.28.0'
- s.add_development_dependency 'rspec', '~> 3.0'
+ s.add_development_dependency 'bundler', '~> 1.9'
+ s.add_development_dependency 'rake', '~> 10.4'
+ s.add_development_dependency 'rake-compiler', '~> 0.9'
+ s.add_development_dependency 'rubocop', '~> 0.30'
+ s.add_development_dependency 'rspec', '~> 3.2'
s.extensions = %w(ext/grpc/extconf.rb)
end
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b23793730f..35e9c02a94 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@ module GRPC
Status.new(code, details)
end
end
+
+ # Cancelled is an exception class that indicates that an rpc was cancelled.
+ class Cancelled < StandardError
+ end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c2c9..8d63de4145 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ fail GRPC::BadStatus.new(status.code, status.details)
+ end
+ status
+ end
+ end
+end
+
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
+ extend Forwardable
attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@@ -101,50 +119,6 @@ module GRPC
@metadata_tag = metadata_tag
end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
- end
-
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
-
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
-
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
-
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
- end
-
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
@@ -176,9 +150,9 @@ module GRPC
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
- @call.status
+ batch_result.check_status
end
# finished waits until a client call is completed.
@@ -192,17 +166,12 @@ module GRPC
elsif !batch_result.metadata.nil?
@call.metadata.merge!(batch_result.metadata)
end
- if batch_result.status.code != Core::StatusCodes::OK
- fail BadStatus.new(batch_result.status.code,
- batch_result.status.details)
- end
- batch_result
+ batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
+ # It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@ module GRPC
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@ module GRPC
private
+ # Starts the call if not already started
def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@@ -466,6 +448,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
+ :metadata, :status, :start_call)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1c1b3b0db7..b813ab5b54 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -100,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -122,10 +118,13 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @loop_th.join
+ @enq_th.join
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -136,20 +135,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
- RECV_STATUS_ON_CLIENT => nil)
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -163,7 +165,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -171,7 +173,7 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
- logger.debug('done reading!')
+ logger.debug('bidi-read-loop: done reading!')
break
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 245999ea03..1323bacfa6 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -388,6 +388,23 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should handle cancellation correctly', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **@client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ Thread.new do # cancel the call
+ sleep 0.1
+ op.cancel
+ end
+ expect { op.execute }.to raise_error GRPC::Cancelled
+ @srv.stop
+ t.join
+ end
+
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)