diff options
author | Yuki Yugui Sonoda <yugui@yugui.jp> | 2015-04-17 08:03:20 +0900 |
---|---|---|
committer | Yuki Yugui Sonoda <yugui@yugui.jp> | 2015-04-17 08:03:20 +0900 |
commit | dca359cc5aedc45779828ad4aa7df196ca48d94a (patch) | |
tree | 21a18bd31791064717eb840f3993fc57b865c8f4 /src | |
parent | 29ee1dbc93d1b97578d39c796bfc19b99e57545f (diff) | |
parent | d35b7107f8c54196ba8ddd55a0760e5f559e2014 (diff) |
Merge branch 'master' into fix/typed-struct
Conflicts:
src/ruby/ext/grpc/rb_grpc.c
Diffstat (limited to 'src')
31 files changed, 349 insertions, 1381 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index bd8bf65349..1324198847 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -110,7 +110,7 @@ bool HasBidiStreaming(const grpc::protobuf::FileDescriptor *file) { return false; } -grpc::string FilenameIdentifier(const grpc::string& filename) { +grpc::string FilenameIdentifier(const grpc::string &filename) { grpc::string result; for (unsigned i = 0; i < filename.size(); i++) { char c = filename[i]; @@ -154,6 +154,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string temp = "#include <grpc++/impl/internal_stub.h>\n" + "#include <grpc++/impl/rpc_method.h>\n" "#include <grpc++/impl/service_type.h>\n" "#include <grpc++/status.h>\n" "\n" @@ -172,7 +173,9 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, temp.append("template <class OutMessage> class ClientWriter;\n"); temp.append("template <class InMessage> class ServerReader;\n"); temp.append("template <class OutMessage> class ClientAsyncWriter;\n"); - temp.append("template <class OutMessage, class InMessage> class ServerAsyncReader;\n"); + temp.append( + "template <class OutMessage, class InMessage> class " + "ServerAsyncReader;\n"); } if (HasServerOnlyStreaming(file)) { temp.append("template <class InMessage> class ClientReader;\n"); @@ -246,11 +249,11 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, *vars, "std::unique_ptr< ::grpc::ClientReader< $Response$>> $Method$(" "::grpc::ClientContext* context, const $Request$& request);\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> Async$Method$(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag);\n"); + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " + "Async$Method$(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq, void* tag);\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, @@ -264,10 +267,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, } } -void PrintHeaderServerMethodSync( - grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { +void PrintHeaderClientMethodData(grpc::protobuf::io::Printer *printer, + const grpc::protobuf::MethodDescriptor *method, + std::map<grpc::string, grpc::string> *vars) { + (*vars)["Method"] = method->name(); + printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n"); +} + +void PrintHeaderServerMethodSync(grpc::protobuf::io::Printer *printer, + const grpc::protobuf::MethodDescriptor *method, + std::map<grpc::string, grpc::string> *vars) { (*vars)["Method"] = method->name(); (*vars)["Request"] = grpc_cpp_generator::ClassName(method->input_type(), true); @@ -351,10 +360,18 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, "class Stub GRPC_FINAL : public ::grpc::InternalStub {\n" " public:\n"); printer->Indent(); + printer->Print( + "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i), vars); } printer->Outdent(); + printer->Print(" private:\n"); + printer->Indent(); + for (int i = 0; i < service->method_count(); ++i) { + PrintHeaderClientMethodData(printer, service->method(i), vars); + } + printer->Outdent(); printer->Print("};\n"); printer->Print( "static std::unique_ptr<Stub> NewStub(const std::shared_ptr< " @@ -479,7 +496,6 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, printer.Print(vars, "#include <grpc++/async_unary_call.h>\n"); printer.Print(vars, "#include <grpc++/channel_interface.h>\n"); printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n"); - printer.Print(vars, "#include <grpc++/impl/rpc_method.h>\n"); printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n"); printer.Print(vars, "#include <grpc++/impl/service_type.h>\n"); printer.Print(vars, "#include <grpc++/stream.h>\n"); @@ -513,8 +529,8 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, - " return ::grpc::BlockingUnaryCall(channel()," - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), " + " return ::grpc::BlockingUnaryCall(channel(), " + "rpcmethod_$Method$_, " "context, request, response);\n" "}\n\n"); printer->Print( @@ -528,7 +544,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, "::grpc::ClientAsyncResponseReader< $Response$>>(new " "::grpc::ClientAsyncResponseReader< $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), " + "rpcmethod_$Method$_, " "context, request, tag));\n" "}\n\n"); } else if (ClientOnlyStreaming(method)) { @@ -540,8 +556,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, " return std::unique_ptr< ::grpc::ClientWriter< " "$Request$>>(new ::grpc::ClientWriter< $Request$>(" "channel()," - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " + "rpcmethod_$Method$_, " "context, response));\n" "}\n\n"); printer->Print(*vars, @@ -553,8 +568,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, " return std::unique_ptr< ::grpc::ClientAsyncWriter< " "$Request$>>(new ::grpc::ClientAsyncWriter< $Request$>(" "channel(), cq, " - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " + "rpcmethod_$Method$_, " "context, response, tag));\n" "}\n\n"); } else if (ServerOnlyStreaming(method)) { @@ -567,8 +581,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, " return std::unique_ptr< ::grpc::ClientReader< " "$Response$>>(new ::grpc::ClientReader< $Response$>(" "channel()," - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " + "rpcmethod_$Method$_, " "context, request));\n" "}\n\n"); printer->Print(*vars, @@ -580,8 +593,7 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, " return std::unique_ptr< ::grpc::ClientAsyncReader< " "$Response$>>(new ::grpc::ClientAsyncReader< $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " + "rpcmethod_$Method$_, " "context, request, tag));\n" "}\n\n"); } else if (BidiStreaming(method)) { @@ -594,22 +606,21 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, "$Request$, $Response$>>(new ::grpc::ClientReaderWriter< " "$Request$, $Response$>(" "channel()," - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " + "rpcmethod_$Method$_, " "context));\n" "}\n\n"); - printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " - "$Request$, $Response$>> " - "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "$Request$, $Response$>> " + "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>>(new " "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " + "rpcmethod_$Method$_, " "context, tag));\n" "}\n\n"); } @@ -681,9 +692,9 @@ void PrintSourceServerAsyncMethod( "$Request$* request, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " AsynchronousService::RequestAsyncUnary($Idx$, context, request, response, cq, tag);\n"); + printer->Print(*vars, + " AsynchronousService::RequestAsyncUnary($Idx$, context, " + "request, response, cq, tag);\n"); printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, @@ -691,9 +702,9 @@ void PrintSourceServerAsyncMethod( "::grpc::ServerContext* context, " "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " AsynchronousService::RequestClientStreaming($Idx$, context, reader, cq, tag);\n"); + printer->Print(*vars, + " AsynchronousService::RequestClientStreaming($Idx$, " + "context, reader, cq, tag);\n"); printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, @@ -702,9 +713,9 @@ void PrintSourceServerAsyncMethod( "$Request$* request, " "::grpc::ServerAsyncWriter< $Response$>* writer, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " AsynchronousService::RequestServerStreaming($Idx$, context, request, writer, cq, tag);\n"); + printer->Print(*vars, + " AsynchronousService::RequestServerStreaming($Idx$, " + "context, request, writer, cq, tag);\n"); printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -713,9 +724,9 @@ void PrintSourceServerAsyncMethod( "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::CompletionQueue* cq, void *tag) {\n"); - printer->Print( - *vars, - " AsynchronousService::RequestBidiStreaming($Idx$, context, stream, cq, tag);\n"); + printer->Print(*vars, + " AsynchronousService::RequestBidiStreaming($Idx$, " + "context, stream, cq, tag);\n"); printer->Print("}\n\n"); } } @@ -725,7 +736,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, std::map<grpc::string, grpc::string> *vars) { (*vars)["Service"] = service->name(); - printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n"); + printer->Print(*vars, + "static const char* $prefix$$Service$_method_names[] = {\n"); for (int i = 0; i < service->method_count(); ++i) { (*vars)["Method"] = service->method(i)->name(); printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n"); @@ -736,21 +748,51 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, *vars, "std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub(" "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n" - " std::unique_ptr< $ns$$Service$::Stub> stub(new $ns$$Service$::Stub());\n" - " stub->set_channel(channel);\n" + " std::unique_ptr< $ns$$Service$::Stub> stub(new " + "$ns$$Service$::Stub(channel));\n" " return stub;\n" "}\n\n"); + printer->Print(*vars, + "$ns$$Service$::Stub::Stub(const std::shared_ptr< " + "::grpc::ChannelInterface>& channel)\n"); + printer->Indent(); + printer->Print(": ::grpc::InternalStub(channel)"); + for (int i = 0; i < service->method_count(); ++i) { + const grpc::protobuf::MethodDescriptor *method = service->method(i); + (*vars)["Method"] = method->name(); + (*vars)["Idx"] = as_string(i); + if (NoStreaming(method)) { + (*vars)["StreamingType"] = "NORMAL_RPC"; + } else if (ClientOnlyStreaming(method)) { + (*vars)["StreamingType"] = "CLIENT_STREAMING"; + } else if (ServerOnlyStreaming(method)) { + (*vars)["StreamingType"] = "SERVER_STREAMING"; + } else { + (*vars)["StreamingType"] = "BIDI_STREAMING"; + } + printer->Print( + *vars, + ", rpcmethod_$Method$_(" + "$prefix$$Service$_method_names[$Idx$], " + "::grpc::RpcMethod::$StreamingType$, " + "channel->RegisterMethod($prefix$$Service$_method_names[$Idx$])" + ")\n"); + } + printer->Print("{}\n\n"); + printer->Outdent(); + for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); PrintSourceClientMethod(printer, service->method(i), vars); } (*vars)["MethodCount"] = as_string(service->method_count()); - printer->Print( - *vars, - "$ns$$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : " - "::grpc::AsynchronousService(cq, $prefix$$Service$_method_names, $MethodCount$) " - "{}\n\n"); + printer->Print(*vars, + "$ns$$Service$::AsyncService::AsyncService(::grpc::" + "CompletionQueue* cq) : " + "::grpc::AsynchronousService(cq, " + "$prefix$$Service$_method_names, $MethodCount$) " + "{}\n\n"); printer->Print(*vars, "$ns$$Service$::Service::~Service() {\n" @@ -783,7 +825,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, $Request$, " + " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " + "$Request$, " "$Response$>(\n" " std::function< ::grpc::Status($ns$$Service$::Service*, " "::grpc::ServerContext*, const $Request$*, $Response$*)>(" diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e764a3b9af..d3962a00c4 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -43,6 +43,12 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +typedef struct registered_call { + grpc_mdelem *path; + grpc_mdelem *authority; + struct registered_call *next; +} registered_call; + struct grpc_channel { int is_client; gpr_refcount refs; @@ -51,10 +57,14 @@ struct grpc_channel { grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + + gpr_mu registered_call_mu; + registered_call *registered_calls; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) -#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ + (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) @@ -66,7 +76,8 @@ grpc_channel *grpc_channel_create_from_filters( grpc_channel *channel = gpr_malloc(size); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if + * is_client */ gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); @@ -75,18 +86,17 @@ grpc_channel *grpc_channel_create_from_filters( channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); + gpr_mu_init(&channel->registered_call_mu); + channel->registered_calls = NULL; return channel; } static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_completion_queue *cq, - const char *method, const char *host, - gpr_timespec absolute_deadline) { +static grpc_call *grpc_channel_create_call_internal( + grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_call *call; - grpc_mdelem *path_mdelem; - grpc_mdelem *authority_mdelem; grpc_call_op op; if (!channel->is_client) { @@ -97,11 +107,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, call = grpc_call_create(channel, cq, NULL); /* Add :path and :authority headers. */ - /* TODO(klempner): Consider optimizing this by stashing mdelems for common - values of method and host. */ - path_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)); op.type = GRPC_SEND_METADATA; op.dir = GRPC_CALL_DOWN; op.flags = 0; @@ -110,18 +115,14 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, op.user_data = NULL; grpc_call_execute_op(call, &op); - grpc_mdstr_ref(channel->authority_string); - authority_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, channel->authority_string, - grpc_mdstr_from_string(channel->metadata_context, host)); op.data.metadata = authority_mdelem; grpc_call_execute_op(call, &op); - if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { + if (0 != gpr_time_cmp(deadline, gpr_inf_future)) { op.type = GRPC_SEND_DEADLINE; op.dir = GRPC_CALL_DOWN; op.flags = 0; - op.data.deadline = absolute_deadline; + op.data.deadline = deadline; op.done_cb = do_nothing; op.user_data = NULL; grpc_call_execute_op(call, &op); @@ -130,6 +131,21 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, return call; } +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec deadline) { + return grpc_channel_create_call_internal( + channel, cq, + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)), + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)), + deadline); +} + grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec absolute_deadline) { @@ -137,6 +153,31 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, absolute_deadline); } +void *grpc_channel_register_call(grpc_channel *channel, const char *method, + const char *host) { + registered_call *rc = gpr_malloc(sizeof(registered_call)); + rc->path = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)); + rc->authority = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)); + gpr_mu_lock(&channel->registered_call_mu); + rc->next = channel->registered_calls; + channel->registered_calls = rc; + gpr_mu_unlock(&channel->registered_call_mu); + return rc; +} + +grpc_call *grpc_channel_create_registered_call( + grpc_channel *channel, grpc_completion_queue *completion_queue, + void *registered_call_handle, gpr_timespec deadline) { + registered_call *rc = registered_call_handle; + return grpc_channel_create_call_internal( + channel, completion_queue, grpc_mdelem_ref(rc->path), + grpc_mdelem_ref(rc->authority), deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } @@ -148,7 +189,15 @@ static void destroy_channel(void *p, int ok) { grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); + while (channel->registered_calls) { + registered_call *rc = channel->registered_calls; + channel->registered_calls = rc->next; + grpc_mdelem_unref(rc->path); + grpc_mdelem_unref(rc->authority); + gpr_free(rc); + } grpc_mdctx_unref(channel->metadata_context); + gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel); } diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 4de51a666f..5a119a47cc 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -61,8 +61,8 @@ void grpc_init(void) { grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("batch", &grpc_trace_batch); grpc_security_pre_init(); - grpc_tracer_init("GRPC_TRACE"); grpc_iomgr_init(); + grpc_tracer_init("GRPC_TRACE"); census_init(); grpc_timers_log_global_init(); } diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 72123abbc8..478f223322 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -61,12 +61,17 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); } Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) { - auto c_call = grpc_channel_create_call(c_channel_, cq->cq(), method.name(), - context->authority().empty() - ? target_.c_str() - : context->authority().c_str(), - context->RawDeadline()); - GRPC_TIMER_MARK(CALL_CREATED,c_call); + auto c_call = + method.channel_tag() + ? grpc_channel_create_registered_call(c_channel_, cq->cq(), + method.channel_tag(), + context->RawDeadline()) + : grpc_channel_create_call(c_channel_, cq->cq(), method.name(), + context->authority().empty() + ? target_.c_str() + : context->authority().c_str(), + context->RawDeadline()); + GRPC_TIMER_MARK(CALL_CREATED, c_call); context->set_call(c_call); return Call(c_call, this, cq); } @@ -82,4 +87,8 @@ void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { GRPC_TIMER_MARK(PERFORM_OPS_END, call->call()); } +void* Channel::RegisterMethod(const char* method) { + return grpc_channel_register_call(c_channel_, method, target_.c_str()); +} + } // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index 3980eba237..aaf4dbe10d 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -54,6 +54,7 @@ class Channel GRPC_FINAL : public ChannelInterface { Channel(const grpc::string& target, grpc_channel* c_channel); ~Channel() GRPC_OVERRIDE; + virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE; virtual Call CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) GRPC_OVERRIDE; virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 3bf7bdf45f..0c90578ae5 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -39,13 +39,13 @@ namespace grpc { // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( - ClientContext* context, const grpc::string& method, - CompletionQueue* cq, void* tag) { + ClientContext* context, const grpc::string& method, CompletionQueue* cq, + void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( new GenericClientAsyncReaderWriter( - channel_.get(), cq, RpcMethod(method.c_str()), context, tag)); + channel_.get(), cq, + RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING, nullptr), + context, tag)); } - -} // namespace grpc - +} // namespace grpc diff --git a/src/node/examples/pubsub/empty.proto b/src/node/examples/pubsub/empty.proto deleted file mode 100644 index 5d6eb10841..0000000000 --- a/src/node/examples/pubsub/empty.proto +++ /dev/null @@ -1,44 +0,0 @@ -// This file will be moved to a new location. - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -syntax = "proto2"; - -package proto2; - -// An empty message that you can re-use to avoid defining duplicated empty -// messages in your project. A typical example is to use it as argument or the -// return value of a service API. For instance: -// -// service Foo { -// rpc Bar (proto2.Empty) returns (proto2.Empty) { }; -// }; -// -message Empty {} diff --git a/src/node/examples/pubsub/label.proto b/src/node/examples/pubsub/label.proto deleted file mode 100644 index 0af15a25a6..0000000000 --- a/src/node/examples/pubsub/label.proto +++ /dev/null @@ -1,79 +0,0 @@ -// This file will be moved to a new location. - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -// Labels provide a way to associate user-defined metadata with various -// objects. Labels may be used to organize objects into non-hierarchical -// groups; think metadata tags attached to mp3s. - -syntax = "proto2"; - -package tech.label; - -// A key-value pair applied to a given object. -message Label { - // The key of a label is a syntactically valid URL (as per RFC 1738) with - // the "scheme" and initial slashes omitted and with the additional - // restrictions noted below. Each key should be globally unique. The - // "host" portion is called the "namespace" and is not necessarily - // resolvable to a network endpoint. Instead, the namespace indicates what - // system or entity defines the semantics of the label. Namespaces do not - // restrict the set of objects to which a label may be associated. - // - // Keys are defined by the following grammar: - // - // key = hostname "/" kpath - // kpath = ksegment *[ "/" ksegment ] - // ksegment = alphadigit | *[ alphadigit | "-" | "_" | "." ] - // - // where "hostname" and "alphadigit" are defined as in RFC 1738. - // - // Example key: - // spanner.google.com/universe - required string key = 1; - - // The value of the label. - oneof value { - // A string value. - string str_value = 2; - // An integer value. - int64 num_value = 3; - } -} - -// A collection of labels, such as the set of all labels attached to an -// object. Each label in the set must have a different key. -// -// Users should prefer to embed "repeated Label" directly when possible. -// This message should only be used in cases where that isn't possible (e.g. -// with oneof). -message Labels { - repeated Label label = 1; -} diff --git a/src/node/examples/pubsub/pubsub.proto b/src/node/examples/pubsub/pubsub.proto deleted file mode 100644 index 41a354773f..0000000000 --- a/src/node/examples/pubsub/pubsub.proto +++ /dev/null @@ -1,734 +0,0 @@ -// This file will be moved to a new location. - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -// Specification of the Pubsub API. - -syntax = "proto2"; - -import "empty.proto"; -import "label.proto"; - -package tech.pubsub; - -// ----------------------------------------------------------------------------- -// Overview of the Pubsub API -// ----------------------------------------------------------------------------- - -// This file describes an API for a Pubsub system. This system provides a -// reliable many-to-many communication mechanism between independently written -// publishers and subscribers where the publisher publishes messages to "topics" -// and each subscriber creates a "subscription" and consumes messages from it. -// -// (a) The pubsub system maintains bindings between topics and subscriptions. -// (b) A publisher publishes messages into a topic. -// (c) The pubsub system delivers messages from topics into relevant -// subscriptions. -// (d) A subscriber receives pending messages from its subscription and -// acknowledges or nacks each one to the pubsub system. -// (e) The pubsub system removes acknowledged messages from that subscription. - -// ----------------------------------------------------------------------------- -// Data Model -// ----------------------------------------------------------------------------- - -// The data model consists of the following: -// -// * Topic: A topic is a resource to which messages are published by publishers. -// Topics are named, and the name of the topic is unique within the pubsub -// system. -// -// * Subscription: A subscription records the subscriber's interest in a topic. -// It can optionally include a query to select a subset of interesting -// messages. The pubsub system maintains a logical cursor tracking the -// matching messages which still need to be delivered and acked so that -// they can retried as needed. The set of messages that have not been -// acknowledged is called the subscription backlog. -// -// * Message: A message is a unit of data that flows in the system. It contains -// opaque data from the publisher along with its labels. -// -// * Message Labels (optional): A set of opaque key, value pairs assigned -// by the publisher which the subscriber can use for filtering out messages -// in the topic. For example, a label with key "foo.com/device_type" and -// value "mobile" may be added for messages that are only relevant for a -// mobile subscriber; a subscriber on a phone may decide to create a -// subscription only for messages that have this label. - -// ----------------------------------------------------------------------------- -// Publisher Flow -// ----------------------------------------------------------------------------- - -// A publisher publishes messages to the topic using the Publish request: -// -// PubsubMessage message; -// message.set_data("...."); -// Label label; -// label.set_key("foo.com/key1"); -// label.set_str_value("value1"); -// message.add_label(label); -// PublishRequest request; -// request.set_topic("topicName"); -// request.set_message(message); -// PublisherService.Publish(request); - -// ----------------------------------------------------------------------------- -// Subscriber Flow -// ----------------------------------------------------------------------------- - -// The subscriber part of the API is richer than the publisher part and has a -// number of concepts w.r.t. subscription creation and monitoring: -// -// (1) A subscriber creates a subscription using the CreateSubscription call. -// It may specify an optional "query" to indicate that it wants to receive -// only messages with a certain set of labels using the label query syntax. -// It may also specify an optional truncation policy to indicate when old -// messages from the subcription can be removed. -// -// (2) A subscriber receives messages in one of two ways: via push or pull. -// -// (a) To receive messages via push, the PushConfig field must be specified in -// the Subscription parameter when creating a subscription. The PushConfig -// specifies an endpoint at which the subscriber must expose the -// PushEndpointService. Messages are received via the HandlePubsubEvent -// method. The push subscriber responds to the HandlePubsubEvent method -// with a result code that indicates one of three things: Ack (the message -// has been successfully processed and the Pubsub system may delete it), -// Nack (the message has been rejected, the Pubsub system should resend it -// at a later time), or Push-Back (this is a Nack with the additional -// semantics that the subscriber is overloaded and the pubsub system should -// back off on the rate at which it is invoking HandlePubsubEvent). The -// endpoint may be a load balancer for better scalability. -// -// (b) To receive messages via pull a subscriber calls the Pull method on the -// SubscriberService to get messages from the subscription. For each -// individual message, the subscriber may use the ack_id received in the -// PullResponse to Ack the message, Nack the message, or modify the ack -// deadline with ModifyAckDeadline. See the -// Subscription.ack_deadline_seconds field documentation for details on the -// ack deadline behavior. -// -// Note: Messages may be consumed in parallel by multiple subscribers making -// Pull calls to the same subscription; this will result in the set of -// messages from the subscription being shared and each subscriber -// receiving a subset of the messages. -// -// (4) The subscriber can explicitly truncate the current subscription. -// -// (5) "Truncated" events are delivered when a subscription is -// truncated, whether due to the subscription's truncation policy -// or an explicit request from the subscriber. -// -// Subscription creation: -// -// Subscription subscription; -// subscription.set_topic("topicName"); -// subscription.set_name("subscriptionName"); -// subscription.push_config().set_push_endpoint("machinename:8888"); -// SubscriberService.CreateSubscription(subscription); -// -// Consuming messages via push: -// -// TODO(eschapira): Add HTTP push example. -// -// The port 'machinename:8888' must be bound to a stubby server that implements -// the PushEndpointService with the following method: -// -// int HandlePubsubEvent(PubsubEvent event) { -// if (event.subscription().equals("subscriptionName")) { -// if (event.has_message()) { -// Process(event.message().data()); -// } else if (event.truncated()) { -// ProcessTruncatedEvent(); -// } -// } -// return OK; // This return code implies an acknowledgment -// } -// -// Consuming messages via pull: -// -// The subscription must be created without setting the push_config field. -// -// PullRequest pull_request; -// pull_request.set_subscription("subscriptionName"); -// pull_request.set_return_immediately(false); -// while (true) { -// PullResponse pull_response; -// if (SubscriberService.Pull(pull_request, pull_response) == OK) { -// PubsubEvent event = pull_response.pubsub_event(); -// if (event.has_message()) { -// Process(event.message().data()); -// } else if (event.truncated()) { -// ProcessTruncatedEvent(); -// } -// AcknowledgeRequest ack_request; -// ackRequest.set_subscription("subscriptionName"); -// ackRequest.set_ack_id(pull_response.ack_id()); -// SubscriberService.Acknowledge(ack_request); -// } -// } - -// ----------------------------------------------------------------------------- -// Reliability Semantics -// ----------------------------------------------------------------------------- - -// When a subscriber successfully creates a subscription using -// Subscriber.CreateSubscription, it establishes a "subscription point" with -// respect to that subscription - the subscriber is guaranteed to receive any -// message published after this subscription point that matches the -// subscription's query. Note that messages published before the Subscription -// point may or may not be delivered. -// -// If the system truncates the subscription according to the specified -// truncation policy, the system delivers a subscription status event with the -// "truncated" field set to true. We refer to such events as "truncation -// events". A truncation event: -// -// * Informs the subscriber that part of the subscription messages have been -// discarded. The subscriber may want to recover from the message loss, e.g., -// by resyncing its state with its backend. -// * Establishes a new subscription point, i.e., the subscriber is guaranteed to -// receive all changes published after the trunction event is received (or -// until another truncation event is received). -// -// Note that messages are not delivered in any particular order by the pubsub -// system. Furthermore, the system guarantees at-least-once delivery -// of each message or truncation events until acked. - -// ----------------------------------------------------------------------------- -// Deletion -// ----------------------------------------------------------------------------- - -// Both topics and subscriptions may be deleted. Deletion of a topic implies -// deletion of all attached subscriptions. -// -// When a subscription is deleted directly by calling DeleteSubscription, all -// messages are immediately dropped. If it is a pull subscriber, future pull -// requests will return NOT_FOUND. -// -// When a topic is deleted all corresponding subscriptions are immediately -// deleted, and subscribers experience the same behavior as directly deleting -// the subscription. - -// ----------------------------------------------------------------------------- -// The Publisher service and its protos. -// ----------------------------------------------------------------------------- - -// The service that an application uses to manipulate topics, and to send -// messages to a topic. -service PublisherService { - - // Creates the given topic with the given name. - rpc CreateTopic(Topic) returns (Topic) { - } - - // Adds a message to the topic. Returns NOT_FOUND if the topic does not - // exist. - // (-- For different error code values returned via Stubby, see - // util/task/codes.proto. --) - rpc Publish(PublishRequest) returns (proto2.Empty) { - } - - // Adds one or more messages to the topic. Returns NOT_FOUND if the topic does - // not exist. - rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse) { - } - - // Gets the configuration of a topic. Since the topic only has the name - // attribute, this method is only useful to check the existence of a topic. - // If other attributes are added in the future, they will be returned here. - rpc GetTopic(GetTopicRequest) returns (Topic) { - } - - // Lists matching topics. - rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse) { - } - - // Deletes the topic with the given name. All subscriptions to this topic - // are also deleted. Returns NOT_FOUND if the topic does not exist. - // After a topic is deleted, a new topic may be created with the same name. - rpc DeleteTopic(DeleteTopicRequest) returns (proto2.Empty) { - } -} - -// A topic resource. -message Topic { - // Name of the topic. - optional string name = 1; -} - -// A message data and its labels. -message PubsubMessage { - // The message payload. - optional bytes data = 1; - - // Optional list of labels for this message. Keys in this collection must - // be unique. - //(-- TODO(eschapira): Define how key namespace may be scoped to the topic.--) - repeated tech.label.Label label = 2; - - // ID of this message assigned by the server at publication time. Guaranteed - // to be unique within the topic. This value may be read by a subscriber - // that receives a PubsubMessage via a Pull call or a push delivery. It must - // not be populated by a publisher in a Publish call. - optional string message_id = 3; -} - -// Request for the GetTopic method. -message GetTopicRequest { - // The name of the topic to get. - optional string topic = 1; -} - -// Request for the Publish method. -message PublishRequest { - // The message in the request will be published on this topic. - optional string topic = 1; - - // The message to publish. - optional PubsubMessage message = 2; -} - -// Request for the PublishBatch method. -message PublishBatchRequest { - // The messages in the request will be published on this topic. - optional string topic = 1; - - // The messages to publish. - repeated PubsubMessage messages = 2; -} - -// Response for the PublishBatch method. -message PublishBatchResponse { - // The server-assigned ID of each published message, in the same order as - // the messages in the request. IDs are guaranteed to be unique within - // the topic. - repeated string message_ids = 1; -} - -// Request for the ListTopics method. -message ListTopicsRequest { - // A valid label query expression. - // - optional string query = 1; - - // Maximum number of topics to return. - // (-- If not specified or <= 0, the implementation will select a reasonable - // value. --) - optional int32 max_results = 2; - - // The value obtained in the last <code>ListTopicsResponse</code> - // for continuation. - optional string page_token = 3; - -} - -// Response for the ListTopics method. -message ListTopicsResponse { - // The resulting topics. - repeated Topic topic = 1; - - // If not empty, indicates that there are more topics that match the request, - // and this value should be passed to the next <code>ListTopicsRequest</code> - // to continue. - optional string next_page_token = 2; -} - -// Request for the Delete method. -message DeleteTopicRequest { - // Name of the topic to delete. - optional string topic = 1; -} - -// ----------------------------------------------------------------------------- -// The Subscriber service and its protos. -// ----------------------------------------------------------------------------- - -// The service that an application uses to manipulate subscriptions and to -// consume messages from a subscription via the pull method. -service SubscriberService { - - // Creates a subscription on a given topic for a given subscriber. - // If the subscription already exists, returns ALREADY_EXISTS. - // If the corresponding topic doesn't exist, returns NOT_FOUND. - // - // If the name is not provided in the request, the server will assign a random - // name for this subscription on the same project as the topic. - rpc CreateSubscription(Subscription) returns (Subscription) { - } - - // Gets the configuration details of a subscription. - rpc GetSubscription(GetSubscriptionRequest) returns (Subscription) { - } - - // Lists matching subscriptions. - rpc ListSubscriptions(ListSubscriptionsRequest) - returns (ListSubscriptionsResponse) { - } - - // Deletes an existing subscription. All pending messages in the subscription - // are immediately dropped. Calls to Pull after deletion will return - // NOT_FOUND. - rpc DeleteSubscription(DeleteSubscriptionRequest) returns (proto2.Empty) { - } - - // Removes all the pending messages in the subscription and releases the - // storage associated with them. Results in a truncation event to be sent to - // the subscriber. Messages added after this call returns are stored in the - // subscription as before. - rpc TruncateSubscription(TruncateSubscriptionRequest) returns (proto2.Empty) { - } - - // - // Push subscriber calls. - // - - // Modifies the <code>PushConfig</code> for a specified subscription. - // This method can be used to suspend the flow of messages to an endpoint - // by clearing the <code>PushConfig</code> field in the request. Messages - // will be accumulated for delivery even if no push configuration is - // defined or while the configuration is modified. - rpc ModifyPushConfig(ModifyPushConfigRequest) returns (proto2.Empty) { - } - - // - // Pull Subscriber calls - // - - // Pulls a single message from the server. - // If return_immediately is true, and no messages are available in the - // subscription, this method returns FAILED_PRECONDITION. The system is free - // to return an UNAVAILABLE error if no messages are available in a - // reasonable amount of time (to reduce system load). - rpc Pull(PullRequest) returns (PullResponse) { - } - - // Pulls messages from the server. Returns an empty list if there are no - // messages available in the backlog. The system is free to return UNAVAILABLE - // if there are too many pull requests outstanding for the given subscription. - rpc PullBatch(PullBatchRequest) returns (PullBatchResponse) { - } - - // Modifies the Ack deadline for a message received from a pull request. - rpc ModifyAckDeadline(ModifyAckDeadlineRequest) returns (proto2.Empty) { - } - - // Acknowledges a particular received message: the Pub/Sub system can remove - // the given message from the subscription. Acknowledging a message whose - // Ack deadline has expired may succeed, but the message could have been - // already redelivered. Acknowledging a message more than once will not - // result in an error. This is only used for messages received via pull. - rpc Acknowledge(AcknowledgeRequest) returns (proto2.Empty) { - } - - // Refuses processing a particular received message. The system will - // redeliver this message to some consumer of the subscription at some - // future time. This is only used for messages received via pull. - rpc Nack(NackRequest) returns (proto2.Empty) { - } -} - -// A subscription resource. -message Subscription { - // Name of the subscription. - optional string name = 1; - - // The name of the topic from which this subscription is receiving messages. - optional string topic = 2; - - // If <code>query</code> is non-empty, only messages on the subscriber's - // topic whose labels match the query will be returned. Otherwise all - // messages on the topic will be returned. - // - optional string query = 3; - - // The subscriber may specify requirements for truncating unacknowledged - // subscription entries. The system will honor the - // <code>CreateSubscription</code> request only if it can meet these - // requirements. If this field is not specified, messages are never truncated - // by the system. - optional TruncationPolicy truncation_policy = 4; - - // Specifies which messages can be truncated by the system. - message TruncationPolicy { - oneof policy { - // If <code>max_bytes</code> is specified, the system is allowed to drop - // old messages to keep the combined size of stored messages under - // <code>max_bytes</code>. This is a hint; the system may keep more than - // this many bytes, but will make a best effort to keep the size from - // growing much beyond this parameter. - int64 max_bytes = 1; - - // If <code>max_age_seconds</code> is specified, the system is allowed to - // drop messages that have been stored for at least this many seconds. - // This is a hint; the system may keep these messages, but will make a - // best effort to remove them when their maximum age is reached. - int64 max_age_seconds = 2; - } - } - - // If push delivery is used with this subscription, this field is - // used to configure it. - optional PushConfig push_config = 5; - - // For either push or pull delivery, the value is the maximum time after a - // subscriber receives a message before the subscriber should acknowledge or - // Nack the message. If the Ack deadline for a message passes without an - // Ack or a Nack, the Pub/Sub system will eventually redeliver the message. - // If a subscriber acknowledges after the deadline, the Pub/Sub system may - // accept the Ack, but it is possible that the message has been already - // delivered again. Multiple Acks to the message are allowed and will - // succeed. - // - // For push delivery, this value is used to set the request timeout for - // the call to the push endpoint. - // - // For pull delivery, this value is used as the initial value for the Ack - // deadline. It may be overridden for a specific pull request (message) with - // <code>ModifyAckDeadline</code>. - // While a message is outstanding (i.e. it has been delivered to a pull - // subscriber and the subscriber has not yet Acked or Nacked), the Pub/Sub - // system will not deliver that message to another pull subscriber - // (on a best-effort basis). - optional int32 ack_deadline_seconds = 6; - - // If this parameter is set to n, the system is allowed to (but not required - // to) delete the subscription when at least n seconds have elapsed since the - // client presence was detected. (Presence is detected through any - // interaction using the subscription ID, including Pull(), Get(), or - // acknowledging a message.) - // - // If this parameter is not set, the subscription will stay live until - // explicitly deleted. - // - // Clients can detect such garbage collection when a Get call or a Pull call - // (for pull subscribers only) returns NOT_FOUND. - optional int64 garbage_collect_seconds = 7; -} - -// Configuration for a push delivery endpoint. -message PushConfig { - // A URL locating the endpoint to which messages should be pushed. - // For example, a Webhook endpoint might use "https://example.com/push". - // (-- An Android application might use "gcm:<REGID>", where <REGID> is a - // GCM registration id allocated for pushing messages to the application. --) - optional string push_endpoint = 1; -} - -// An event indicating a received message or truncation event. -message PubsubEvent { - // The subscription that received the event. - optional string subscription = 1; - - oneof type { - // A received message. - PubsubMessage message = 2; - - // Indicates that this subscription has been truncated. - bool truncated = 3; - - // Indicates that this subscription has been deleted. (Note that pull - // subscribers will always receive NOT_FOUND in response in their pull - // request on the subscription, rather than seeing this boolean.) - bool deleted = 4; - } -} - -// Request for the GetSubscription method. -message GetSubscriptionRequest { - // The name of the subscription to get. - optional string subscription = 1; -} - -// Request for the ListSubscriptions method. -message ListSubscriptionsRequest { - // A valid label query expression. - // (-- Which labels are required or supported is implementation-specific. - // TODO(eschapira): This method must support to query by topic. We must - // define the key URI for the "topic" label. --) - optional string query = 1; - - // Maximum number of subscriptions to return. - // (-- If not specified or <= 0, the implementation will select a reasonable - // value. --) - optional int32 max_results = 3; - - // The value obtained in the last <code>ListSubscriptionsResponse</code> - // for continuation. - optional string page_token = 4; -} - -// Response for the ListSubscriptions method. -message ListSubscriptionsResponse { - // The subscriptions that match the request. - repeated Subscription subscription = 1; - - // If not empty, indicates that there are more subscriptions that match the - // request and this value should be passed to the next - // <code>ListSubscriptionsRequest</code> to continue. - optional string next_page_token = 2; -} - -// Request for the TruncateSubscription method. -message TruncateSubscriptionRequest { - // The subscription that is being truncated. - optional string subscription = 1; -} - -// Request for the DeleteSubscription method. -message DeleteSubscriptionRequest { - // The subscription to delete. - optional string subscription = 1; -} - -// Request for the ModifyPushConfig method. -message ModifyPushConfigRequest { - // The name of the subscription. - optional string subscription = 1; - - // An empty <code>push_config</code> indicates that the Pub/Sub system should - // pause pushing messages from the given subscription. - optional PushConfig push_config = 2; -} - -// ----------------------------------------------------------------------------- -// The protos used by a pull subscriber. -// ----------------------------------------------------------------------------- - -// Request for the Pull method. -message PullRequest { - // The subscription from which a message should be pulled. - optional string subscription = 1; - - // If this is specified as true the system will respond immediately even if - // it is not able to return a message in the Pull response. Otherwise the - // system is allowed to wait until at least one message is available rather - // than returning FAILED_PRECONDITION. The client may cancel the request if - // it does not wish to wait any longer for the response. - optional bool return_immediately = 2; -} - -// Either a <code>PubsubMessage</code> or a truncation event. One of these two -// must be populated. -message PullResponse { - // This ID must be used to acknowledge the received event or message. - optional string ack_id = 1; - - // A pubsub message or truncation event. - optional PubsubEvent pubsub_event = 2; -} - -// Request for the PullBatch method. -message PullBatchRequest { - // The subscription from which messages should be pulled. - optional string subscription = 1; - - // If this is specified as true the system will respond immediately even if - // it is not able to return a message in the Pull response. Otherwise the - // system is allowed to wait until at least one message is available rather - // than returning no messages. The client may cancel the request if it does - // not wish to wait any longer for the response. - optional bool return_immediately = 2; - - // The maximum number of PubsubEvents returned for this request. The Pub/Sub - // system may return fewer than the number of events specified. - optional int32 max_events = 3; -} - -// Response for the PullBatch method. -message PullBatchResponse { - - // Received Pub/Sub messages or status events. The Pub/Sub system will return - // zero messages if there are no more messages available in the backlog. The - // Pub/Sub system may return fewer than the max_events requested even if - // there are more messages available in the backlog. - repeated PullResponse pull_responses = 2; -} - -// Request for the ModifyAckDeadline method. -message ModifyAckDeadlineRequest { - // The name of the subscription from which messages are being pulled. - optional string subscription = 1; - - // The acknowledgment ID. - optional string ack_id = 2; - - // The new Ack deadline. Must be >= 0. - optional int32 ack_deadline_seconds = 3; -} - -// Request for the Acknowledge method. -message AcknowledgeRequest { - // The subscription whose message is being acknowledged. - optional string subscription = 1; - - // The acknowledgment ID for the message being acknowledged. This was - // returned by the Pub/Sub system in the Pull response. - repeated string ack_id = 2; -} - -// Request for the Nack method. -message NackRequest { - // The subscription whose message is being Nacked. - optional string subscription = 1; - - // The acknowledgment ID for the message being refused. This was returned by - // the Pub/Sub system in the Pull response. - repeated string ack_id = 2; -} - -// ----------------------------------------------------------------------------- -// The service and protos used by a push subscriber. -// ----------------------------------------------------------------------------- - -// The service that a subscriber uses to handle messages sent via push -// delivery. -// This service is not currently exported for HTTP clients. -// TODO(eschapira): Explain HTTP subscribers. -service PushEndpointService { - // Sends a <code>PubsubMessage</code> or a subscription status event to a - // push endpoint. - // The push endpoint responds with an empty message and a code from - // util/task/codes.proto. The following codes have a particular meaning to the - // Pub/Sub system: - // OK - This is interpreted by Pub/Sub as Ack. - // ABORTED - This is intepreted by Pub/Sub as a Nack, without implying - // pushback for congestion control. The Pub/Sub system will - // retry this message at a later time. - // UNAVAILABLE - This is intepreted by Pub/Sub as a Nack, with the additional - // semantics of push-back. The Pub/Sub system will use an AIMD - // congestion control algorithm to backoff the rate of sending - // messages from this subscription. - // Any other code, or a failure to respond, will be interpreted in the same - // way as ABORTED; i.e. the system will retry the message at a later time to - // ensure reliable delivery. - rpc HandlePubsubEvent(PubsubEvent) returns (proto2.Empty); -} diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js deleted file mode 100644 index 26301515f0..0000000000 --- a/src/node/examples/pubsub/pubsub_demo.js +++ /dev/null @@ -1,285 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -'use strict'; - -var async = require('async'); -var fs = require('fs'); -var GoogleAuth = require('google-auth-library'); -var parseArgs = require('minimist'); -var strftime = require('strftime'); -var _ = require('underscore'); -var grpc = require('../..'); -var PROTO_PATH = __dirname + '/pubsub.proto'; -var pubsub = grpc.load(PROTO_PATH).tech.pubsub; - -function PubsubRunner(pub, sub, args) { - this.pub = pub; - this.sub = sub; - this.args = args; -} - -PubsubRunner.prototype.getTestTopicName = function() { - var base_name = '/topics/' + this.args.project_id + '/'; - if (this.args.topic_name) { - return base_name + this.args.topic_name; - } - var now_text = strftime('%Y%m%d%H%M%S%L'); - return base_name + process.env.USER + '-' + now_text; -}; - -PubsubRunner.prototype.getTestSubName = function() { - var base_name = '/subscriptions/' + this.args.project_id + '/'; - if (this.args.sub_name) { - return base_name + this.args.sub_name; - } - var now_text = strftime('%Y%m%d%H%M%S%L'); - return base_name + process.env.USER + '-' + now_text; -}; - -PubsubRunner.prototype.listProjectTopics = function(callback) { - var q = ('cloud.googleapis.com/project in (/projects/' + - this.args.project_id + ')'); - this.pub.listTopics({query: q}, callback); -}; - -PubsubRunner.prototype.topicExists = function(name, callback) { - this.listProjectTopics(function(err, response) { - if (err) { - callback(err); - } else { - callback(null, _.some(response.topic, function(t) { - return t.name === name; - })); - } - }); -}; - -PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { - var self = this; - this.topicExists(name, function(err, exists) { - if (err) { - callback(err); - } else{ - if (exists) { - callback(null); - } else { - self.pub.createTopic({name: name}, callback); - } - } - }); -}; - -PubsubRunner.prototype.removeTopic = function(callback) { - var name = this.getTestTopicName(); - console.log('... removing Topic', name); - this.pub.deleteTopic({topic: name}, function(err, value) { - if (err) { - console.log('Could not delete a topic: rpc failed with', err); - callback(err); - } else { - console.log('removed Topic', name, 'OK'); - callback(null); - } - }); -}; - -PubsubRunner.prototype.createTopic = function(callback) { - var name = this.getTestTopicName(); - console.log('... creating Topic', name); - this.pub.createTopic({name: name}, function(err, value) { - if (err) { - console.log('Could not create a topic: rpc failed with', err); - callback(err); - } else { - console.log('created Topic', name, 'OK'); - callback(null); - } - }); -}; - -PubsubRunner.prototype.listSomeTopics = function(callback) { - console.log('Listing topics'); - console.log('-------------_'); - this.listProjectTopics(function(err, response) { - if (err) { - console.log('Could not list topic: rpc failed with', err); - callback(err); - } else { - _.each(response.topic, function(t) { - console.log(t.name); - }); - callback(null); - } - }); -}; - -PubsubRunner.prototype.checkExists = function(callback) { - var name = this.getTestTopicName(); - console.log('... checking for topic', name); - this.topicExists(name, function(err, exists) { - if (err) { - console.log('Could not check for a topics: rpc failed with', err); - callback(err); - } else { - if (exists) { - console.log(name, 'is a topic'); - } else { - console.log(name, 'is not a topic'); - } - callback(null); - } - }); -}; - -PubsubRunner.prototype.randomPubSub = function(callback) { - var self = this; - var topic_name = this.getTestTopicName(); - var sub_name = this.getTestSubName(); - var subscription = {name: sub_name, topic: topic_name}; - async.waterfall([ - _.bind(this.createTopicIfNeeded, this, topic_name), - _.bind(this.sub.createSubscription, this.sub, subscription), - function(resp, cb) { - var msg_count = _.random(10, 30); - // Set up msg_count messages to publish - var message_senders = _.times(msg_count, function(n) { - return _.bind(self.pub.publish, self.pub, { - topic: topic_name, - message: {data: new Buffer('message ' + n)} - }); - }); - async.parallel(message_senders, function(err, result) { - cb(err, result, msg_count); - }); - }, - function(result, msg_count, cb) { - console.log('Sent', msg_count, 'messages to', topic_name + ',', - 'checking for them now.'); - var batch_request = { - subscription: sub_name, - max_events: msg_count - }; - self.sub.pullBatch(batch_request, cb); - }, - function(batch, cb) { - var ack_id = _.pluck(batch.pull_responses, 'ack_id'); - console.log('Got', ack_id.length, 'messages, acknowledging them...'); - var ack_request = { - subscription: sub_name, - ack_id: ack_id - }; - self.sub.acknowledge(ack_request, cb); - }, - function(result, cb) { - console.log( - 'Test messages were acknowledged OK, deleting the subscription'); - self.sub.deleteSubscription({subscription: sub_name}, cb); - } - ], function (err, result) { - if (err) { - console.log('Could not do random pub sub: rpc failed with', err); - } - callback(err, result); - }); -}; - -function main(callback) { - var argv = parseArgs(process.argv, { - string: [ - 'host', - 'oauth_scope', - 'port', - 'action', - 'project_id', - 'topic_name', - 'sub_name' - ], - default: { - host: 'pubsub-staging.googleapis.com', - oauth_scope: 'https://www.googleapis.com/auth/pubsub', - port: 443, - action: 'listSomeTopics', - project_id: 'stoked-keyword-656' - } - }); - var valid_actions = [ - 'createTopic', - 'removeTopic', - 'listSomeTopics', - 'checkExists', - 'randomPubSub' - ]; - if (_.some(valid_actions, function(action) { - return action === argv.action; - })) { - callback(new Error('Action was not valid')); - } - var address = argv.host + ':' + argv.port; - (new GoogleAuth()).getApplicationDefault(function(err, credential) { - if (err) { - callback(err); - return; - } - if (credential.createScopedRequired()) { - credential = credential.createScoped(argv.oauth_scope); - } - var updateMetadata = grpc.getGoogleAuthDelegate(credential); - var ca_path = process.env.SSL_CERT_FILE; - fs.readFile(ca_path, function(err, ca_data) { - if (err) { - callback(err); - return; - } - var ssl_creds = grpc.Credentials.createSsl(ca_data); - var options = { - credentials: ssl_creds, - 'grpc.ssl_target_name_override': argv.host - }; - var pub = new pubsub.PublisherService(address, options, updateMetadata); - var sub = new pubsub.SubscriberService(address, options, updateMetadata); - var runner = new PubsubRunner(pub, sub, argv); - runner[argv.action](callback); - }); - }); -} - -if (require.main === module) { - main(function(err) { - if (err) { - throw err; - } - }); -} - -module.exports = PubsubRunner; diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index d9fe0a5835..b4d66c517c 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-14 09:35:44 -0700 using RuboCop version 0.29.1. +# on 2015-04-15 18:43:23 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -9,7 +9,7 @@ Metrics/AbcSize: Max: 36 -# Offense count: 2 +# Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: Max: 183 diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index af7a1d5b15..6f1fe2614f 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -136,12 +136,14 @@ class PingPongPlayer include Grpc::Testing::PayloadType attr_accessor :assertions # required by Minitest::Assertions attr_accessor :queue + attr_accessor :canceller_op # reqs is the enumerator over the requests def initialize(msg_sizes) @queue = Queue.new @msg_sizes = msg_sizes @assertions = 0 # required by Minitest::Assertions + @canceller_op = nil # used to cancel after the first response end def each_item @@ -155,12 +157,15 @@ class PingPongPlayer response_parameters: [p_cls.new(size: resp_size)]) yield req resp = @queue.pop - assert_equal(:COMPRESSABLE, resp.payload.type, - 'payload type is wrong') + assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong') assert_equal(resp_size, resp.payload.body.length, - 'payload body #{i} has the wrong length') + "payload body #{count} has the wrong length") p "OK: ping_pong #{count}" count += 1 + unless @canceller_op.nil? + canceller_op.cancel + break + end end end end @@ -260,6 +265,27 @@ class NamedTests p 'OK: ping_pong' end + def cancel_after_begin + msg_sizes = [27_182, 8, 1828, 45_904] + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + op = @stub.streaming_input_call(reqs, return_op: true) + op.cancel + assert_raises(GRPC::Cancelled) { op.execute } + p 'OK: cancel_after_begin' + end + + def cancel_after_first + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + op = @stub.full_duplex_call(ppp.each_item, return_op: true) + ppp.canceller_op = op # causes ppp to cancel after the 1st message + assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } + p 'OK: cancel_after_first' + end + def all all_methods = NamedTests.instance_methods(false).map(&:to_s) all_methods.each do |m| diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 21b933f97f..6da7d3c830 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -42,6 +42,17 @@ #include "rb_completion_queue.h" #include "rb_grpc.h" +/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */ +static VALUE grpc_rb_cCall; + +/* grpc_rb_eCallError is the ruby class of the exception thrown during call + operations; */ +VALUE grpc_rb_eCallError = Qnil; + +/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate + a timeout. */ +static VALUE grpc_rb_eOutOfTime = Qnil; + /* grpc_rb_sBatchResult is struct class used to hold the results of a batch * call. */ static VALUE grpc_rb_sBatchResult; @@ -219,7 +230,7 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { it's capacity should have been computed via a prior call to grpc_rb_md_ary_fill_hash_cb */ -int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { +static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { grpc_metadata_array *md_ary = NULL; int array_length; int i; @@ -259,7 +270,8 @@ int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used to pre-compute the capacity a grpc_metadata_array. */ -int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { +static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, + VALUE md_ary_obj) { grpc_metadata_array *md_ary = NULL; /* Construct a metadata object from key and value and add it */ @@ -278,7 +290,7 @@ int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { /* grpc_rb_md_ary_convert converts a ruby metadata hash into a grpc_metadata_array. */ -void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { +static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { VALUE md_ary_obj = Qnil; if (md_ary_hash == Qnil) { return; /* Do nothing if the expected has value is nil */ @@ -334,7 +346,8 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) { /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks each key of an ops hash is valid. */ -int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) { +static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, + VALUE ops_ary) { /* Update the capacity; the value is an array, add capacity for each value in * the array */ if (TYPE(key) != T_FIXNUM) { @@ -363,7 +376,7 @@ int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) { /* grpc_rb_op_update_status_from_server adds the values in a ruby status struct to the 'send_status_from_server' portion of an op. */ -void grpc_rb_op_update_status_from_server(grpc_op *op, +static void grpc_rb_op_update_status_from_server(grpc_op *op, grpc_metadata_array* md_ary, VALUE status) { VALUE code = rb_struct_aref(status, sym_code); @@ -615,18 +628,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, return result; } -/* grpc_rb_cCall is the ruby class that proxies grpc_call. */ -VALUE grpc_rb_cCall = Qnil; - -/* grpc_rb_eCallError is the ruby class of the exception thrown during call - operations; */ -VALUE grpc_rb_eCallError = Qnil; - -/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate - a timeout. */ -VALUE grpc_rb_eOutOfTime = Qnil; - -void Init_grpc_error_codes() { +static void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ VALUE grpc_rb_mRpcErrors = rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors"); @@ -678,7 +680,7 @@ void Init_grpc_error_codes() { rb_obj_freeze(rb_error_code_details); } -void Init_grpc_op_codes() { +static void Init_grpc_op_codes() { /* Constants representing operation type codes in grpc.h */ VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps"); diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index e20a34c74e..003ce0429e 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -49,17 +49,10 @@ const char* grpc_call_error_detail_of(grpc_call_error err); /* Converts a metadata array to a hash. */ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); -/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */ -extern VALUE grpc_rb_cCall; - /* grpc_rb_eCallError is the ruby class of the exception thrown during call operations. */ extern VALUE grpc_rb_eCallError; -/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate - a timeout. */ -extern VALUE grpc_rb_eOutOfTime; - /* Initializes the Call class. */ void Init_grpc_call(); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 47b85c83ed..214675af92 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -58,6 +58,8 @@ static ID id_target; * GCed before the channel */ static ID id_cqueue; +/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ +static VALUE grpc_rb_cChannel = Qnil; /* Used during the conversion of a hash to channel args during channel setup */ static VALUE grpc_rb_cChannelArgs; @@ -246,9 +248,6 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { return Qnil; } -/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ -VALUE grpc_rb_cChannel = Qnil; - void Init_grpc_channel() { grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); grpc_rb_cChannel = diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h index 5c57b31fb2..6e3c087689 100644 --- a/src/ruby/ext/grpc/rb_channel.h +++ b/src/ruby/ext/grpc/rb_channel.h @@ -37,9 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* grpc_rb_cChannel is the Channel class whose instances proxy grpc_channel. */ -extern VALUE grpc_rb_cChannel; - /* Initializes the Channel class. */ void Init_grpc_channel(); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index db2cc47761..e2c9b85661 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -39,6 +39,10 @@ #include <grpc/support/time.h> #include "rb_grpc.h" +/* grpc_rb_cCompletionQueue is the ruby class that proxies + * grpc_completion_queue. */ +static VALUE grpc_rb_cCompletionQueue = Qnil; + /* Used to allow grpc_completion_queue_next call to release the GIL */ typedef struct next_call_stack { grpc_completion_queue *cq; @@ -177,10 +181,6 @@ grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, return next_call.event; } -/* grpc_rb_cCompletionQueue is the ruby class that proxies - * grpc_completion_queue. */ -VALUE grpc_rb_cCompletionQueue = Qnil; - void Init_grpc_completion_queue() { grpc_rb_cCompletionQueue = rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 1bfb80e499..4d0f49ac47 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -48,10 +48,6 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, VALUE timeout); -/* grpc_rb_cCompletionQueue is the CompletionQueue class whose instances proxy - grpc_completion_queue. */ -extern VALUE grpc_rb_cCompletionQueue; - /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c index 8170f0d26c..1ec88914e4 100644 --- a/src/ruby/ext/grpc/rb_credentials.c +++ b/src/ruby/ext/grpc/rb_credentials.c @@ -40,6 +40,9 @@ #include "rb_grpc.h" +/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */ +static VALUE grpc_rb_cCredentials = Qnil; + /* grpc_rb_credentials wraps a grpc_credentials. It provides a * peer ruby object, 'mark' to minimize copying when a credential is * created from ruby. */ @@ -252,9 +255,6 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { return self; } -/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */ -VALUE grpc_rb_cCredentials = Qnil; - void Init_grpc_credentials() { grpc_rb_cCredentials = rb_define_class_under(grpc_rb_mGrpcCore, "Credentials", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h index dc0a3d01e8..e7c43c9c78 100644 --- a/src/ruby/ext/grpc/rb_credentials.h +++ b/src/ruby/ext/grpc/rb_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* grpc_rb_cCredentials is the ruby class whose instances proxy - grpc_credentials. */ -extern VALUE grpc_rb_cCredentials; - /* Initializes the ruby Credentials class. */ void Init_grpc_credentials(); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index b431636c75..70a28ddc17 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -46,7 +46,7 @@ #include "rb_credentials.h" #include "rb_server_credentials.h" -VALUE grpc_rb_cTimeVal = Qnil; +static VALUE grpc_rb_cTimeVal = Qnil; static rb_data_type_t grpc_rb_timespec_data_type = { "gpr_timespec", @@ -154,7 +154,7 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { return t; } -void Init_grpc_status_codes() { +static void Init_grpc_status_codes() { /* Constants representing the status codes or grpc_status_code in status.h */ VALUE grpc_rb_mStatusCodes = rb_define_module_under(grpc_rb_mGrpcCore, "StatusCodes"); @@ -203,7 +203,7 @@ static ID id_inspect; static ID id_to_s; /* Converts a wrapped time constant to a standard time. */ -VALUE grpc_rb_time_val_to_time(VALUE self) { +static VALUE grpc_rb_time_val_to_time(VALUE self) { gpr_timespec *time_const = NULL; TypedData_Get_Struct(self, gpr_timespec, &grpc_rb_timespec_data_type, time_const); @@ -212,17 +212,17 @@ VALUE grpc_rb_time_val_to_time(VALUE self) { } /* Invokes inspect on the ctime version of the time val. */ -VALUE grpc_rb_time_val_inspect(VALUE self) { +static VALUE grpc_rb_time_val_inspect(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_inspect, 0); } /* Invokes to_s on the ctime version of the time val. */ -VALUE grpc_rb_time_val_to_s(VALUE self) { +static VALUE grpc_rb_time_val_to_s(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_to_s, 0); } /* Adds a module with constants that map to gpr's static timeval structs. */ -void Init_grpc_time_consts() { +static void Init_grpc_time_consts() { VALUE grpc_rb_mTimeConsts = rb_define_module_under(grpc_rb_mGrpcCore, "TimeConsts"); grpc_rb_cTimeVal = @@ -249,7 +249,7 @@ void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } -void grpc_rb_shutdown(void *vm) { grpc_shutdown(); } +static void grpc_rb_shutdown(void *vm) { grpc_shutdown(); } /* Initialize the GRPC module structs */ @@ -262,6 +262,11 @@ VALUE grpc_rb_sStatus = Qnil; VALUE grpc_rb_mGRPC = Qnil; VALUE grpc_rb_mGrpcCore = Qnil; +/* cached Symbols for members in Status struct */ +VALUE sym_code = Qundef; +VALUE sym_details = Qundef; +VALUE sym_metadata = Qundef; + void Init_grpc() { grpc_init(); ruby_vm_at_exit(grpc_rb_shutdown); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 57db350f5e..a502273de1 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -41,9 +41,6 @@ /* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */ extern VALUE grpc_rb_mGrpcCore; -/* Class used to wrap timeval structs. */ -extern VALUE grpc_rb_cTimeVal; - /* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ extern VALUE grpc_rb_sNewServerRpc; @@ -51,13 +48,13 @@ extern VALUE grpc_rb_sNewServerRpc; extern VALUE grpc_rb_sStatus; /* sym_code is the symbol for the code attribute of grpc_rb_sStatus. */ -VALUE sym_code; +extern VALUE sym_code; /* sym_details is the symbol for the details attribute of grpc_rb_sStatus. */ -VALUE sym_details; +extern VALUE sym_details; /* sym_metadata is the symbol for the metadata attribute of grpc_rb_sStatus. */ -VALUE sym_metadata; +extern VALUE sym_metadata; /* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the wrapped struct does not need to participate in ruby gc. */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 5d7c8f7a5d..bc0878af05 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -44,7 +44,7 @@ #include "rb_grpc.h" /* grpc_rb_cServer is the ruby class that proxies grpc_server. */ -VALUE grpc_rb_cServer = Qnil; +static VALUE grpc_rb_cServer = Qnil; /* id_at is the constructor method of the ruby standard Time class. */ static ID id_at; diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h index 22e88a7d46..5e4b711d35 100644 --- a/src/ruby/ext/grpc/rb_server.h +++ b/src/ruby/ext/grpc/rb_server.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* grpc_rb_cServer is the Server class whose instances proxy - grpc_byte_buffer. */ -extern VALUE grpc_rb_cServer; - /* Initializes the Server class. */ void Init_grpc_server(); diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c index ab16c05650..a86389445f 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.c +++ b/src/ruby/ext/grpc/rb_server_credentials.c @@ -40,6 +40,10 @@ #include "rb_grpc.h" +/* grpc_rb_cServerCredentials is the ruby class that proxies + grpc_server_credentials. */ +static VALUE grpc_rb_cServerCredentials = Qnil; + /* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a peer ruby object, 'mark' to minimize copying when a server credential is created from ruby. */ @@ -191,10 +195,6 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs, return self; } -/* grpc_rb_cServerCredentials is the ruby class that proxies - grpc_server_credentials. */ -VALUE grpc_rb_cServerCredentials = Qnil; - void Init_grpc_server_credentials() { grpc_rb_cServerCredentials = rb_define_class_under(grpc_rb_mGrpcCore, "ServerCredentials", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h index f79a869358..35b395ad5c 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.h +++ b/src/ruby/ext/grpc/rb_server_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* grpc_rb_cServerCredentials is the ruby class whose instances proxy - grpc_server_credentials. */ -extern VALUE grpc_rb_cServerCredentials; - /* Initializes the ruby ServerCredentials class. */ void Init_grpc_server_credentials(); diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index a50d0351da..12d4ab17f2 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -22,16 +22,16 @@ Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' - s.add_dependency 'googleauth', '~> 0.4' + s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests s.add_dependency 'logging', '~> 1.8' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_dependency 'xray', '~> 1.1' - s.add_development_dependency 'bundler', '~> 1.7' - s.add_development_dependency 'rake', '~> 10.0' - s.add_development_dependency 'rake-compiler', '~> 0' - s.add_development_dependency 'rubocop', '~> 0.28.0' - s.add_development_dependency 'rspec', '~> 3.0' + s.add_development_dependency 'bundler', '~> 1.9' + s.add_development_dependency 'rake', '~> 10.4' + s.add_development_dependency 'rake-compiler', '~> 0.9' + s.add_development_dependency 'rubocop', '~> 0.30' + s.add_development_dependency 'rspec', '~> 3.2' s.extensions = %w(ext/grpc/extconf.rb) end diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index b23793730f..35e9c02a94 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -54,4 +54,8 @@ module GRPC Status.new(code, details) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 489349c2c9..8d63de4145 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,6 +30,22 @@ require 'forwardable' require 'grpc/generic/bidi_call' +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + fail GRPC::BadStatus.new(status.code, status.details) + end + status + end + end +end + # GRPC contains the General RPC module. module GRPC # The ActiveCall class provides simple methods for sending marshallable @@ -38,7 +54,9 @@ module GRPC include Core::StatusCodes include Core::TimeConsts include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -101,50 +119,6 @@ module GRPC @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false - end - # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -176,9 +150,9 @@ module GRPC SEND_CLOSE_FROM_CLIENT => nil } ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - @call.status + batch_result.check_status end # finished waits until a client call is completed. @@ -192,17 +166,12 @@ module GRPC elsif !batch_result.metadata.nil? @call.metadata.merge!(batch_result.metadata) end - if batch_result.status.code != Core::StatusCodes::OK - fail BadStatus.new(batch_result.status.code, - batch_result.status.details) - end - batch_result + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already @@ -332,6 +301,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -355,6 +327,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -381,6 +356,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -416,6 +394,9 @@ module GRPC start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -436,9 +417,10 @@ module GRPC private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -466,6 +448,6 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) + :metadata, :status, :start_call) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 1c1b3b0db7..b813ab5b54 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -100,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -122,10 +118,13 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @loop_th.join + @enq_th.join end # during bidi-streaming, read the requests to send from a separate thread @@ -136,20 +135,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -163,7 +165,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -171,7 +173,7 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('done reading!') + logger.debug('bidi-read-loop: done reading!') break end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 245999ea03..1323bacfa6 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -388,6 +388,23 @@ describe GRPC::RpcServer do t.join end + it 'should handle cancellation correctly', server: true do + service = SlowService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = SlowStub.new(@host, **@client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + Thread.new do # cancel the call + sleep 0.1 + op.cancel + end + expect { op.execute }.to raise_error GRPC::Cancelled + @srv.stop + t.join + end + it 'should receive updated metadata', server: true do service = EchoService.new @srv.handle(service) |