diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-01 12:24:18 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-01 12:24:18 -0700 |
commit | 399b3c42de555c21202b1f7c897d7ac78c1557d1 (patch) | |
tree | 74f108460da7a1a080a8130ea4f73fde50181f50 /include | |
parent | 2fe814d0ced9f962c1828970d40ae2074f1e6fc5 (diff) |
Fix client_crash_test, implement idempotency, fail_fast for C++
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 36 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 9 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 7 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_context.h | 13 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 29 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_context.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 24 |
8 files changed, 86 insertions, 37 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 78fb7274e2..a607a47106 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -108,7 +108,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + init_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); init_ops_.ClientSendClose(); @@ -173,7 +174,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { finish_ops_.RecvMessage(response); init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + init_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); call_.PerformOps(&init_ops_); } @@ -240,7 +242,8 @@ class ClientAsyncReaderWriter GRPC_FINAL void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + init_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); call_.PerformOps(&init_ops_); } @@ -305,7 +308,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -319,7 +323,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void Finish(const W& msg, const Status& status, void* tag) { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. @@ -336,7 +341,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -366,7 +372,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -374,7 +381,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -385,7 +393,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void Finish(const Status& status, void* tag) { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -415,7 +424,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -429,7 +439,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -440,7 +451,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void Finish(const Status& status, void* tag) { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 52b34770a8..e1067d111f 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -67,7 +67,8 @@ class ClientAsyncResponseReader GRPC_FINAL call_(channel->CreateCall(method, context, cq)), collection_(new CallOpSetCollection) { collection_->init_buf_.SetCollection(collection_); - collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); + collection_->init_buf_.SendInitialMetadata( + context->send_initial_metadata_, context->initial_metadata_flags()); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok()); collection_->init_buf_.ClientSendClose(); @@ -122,7 +123,8 @@ class ServerAsyncResponseWriter GRPC_FINAL GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } @@ -130,7 +132,8 @@ class ServerAsyncResponseWriter GRPC_FINAL void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index f76d7c23ed..aea1a6acec 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -181,8 +181,10 @@ class CallOpSendInitialMetadata { CallOpSendInitialMetadata() : send_(false) {} void SendInitialMetadata( - const std::multimap<grpc::string, grpc::string>& metadata) { + const std::multimap<grpc::string, grpc::string>& metadata, + uint32_t flags) { send_ = true; + flags_ = flags; initial_metadata_count_ = metadata.size(); initial_metadata_ = FillMetadataArray(metadata); } @@ -192,7 +194,7 @@ class CallOpSendInitialMetadata { if (!send_) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->flags = 0; + op->flags = flags_; op->reserved = NULL; op->data.send_initial_metadata.count = initial_metadata_count_; op->data.send_initial_metadata.metadata = initial_metadata_; @@ -204,6 +206,7 @@ class CallOpSendInitialMetadata { } bool send_; + uint32_t flags_; size_t initial_metadata_count_; grpc_metadata* initial_metadata_; }; diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index 5201808600..aed12767a7 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -221,6 +221,12 @@ class ClientContext { deadline_ = deadline_tp.raw_time(); } + /// EXPERIMENTAL: Set this request to be idempotent + void set_idempotent(bool idempotent) { idempotent_ = idempotent; } + + /// EXPERIMENTAL: Trigger fail-fast or not on this request + void set_fail_fast(bool fail_fast) { fail_fast_ = fail_fast; } + #ifndef GRPC_CXX0X_NO_CHRONO /// Return the deadline for the client call. std::chrono::system_clock::time_point deadline() { @@ -328,9 +334,16 @@ class ClientContext { grpc_call* call() { return call_; } void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); + uint32_t initial_metadata_flags() const { + return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) | + (fail_fast_ ? 0 : GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY); + } + grpc::string authority() { return authority_; } bool initial_metadata_received_; + bool fail_fast_; + bool idempotent_; std::shared_ptr<Channel> channel_; grpc::mutex mu_; grpc_call* call_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 0ee5b198d2..70d65549c8 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -62,7 +62,8 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, if (!status.ok()) { return status; } - ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); ops.RecvInitialMetadata(context); ops.RecvMessage(result); ops.ClientSendClose(); diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 436b4d819b..21ac6c4fb5 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -44,10 +44,10 @@ namespace grpc { template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler : public MethodHandler { public: - RpcMethodHandler( - std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ResponseType*)> func, - ServiceType* service) + RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*, + const RequestType*, ResponseType*)> + func, + ServiceType* service) : func_(func), service_(service) {} void RunHandler(const HandlerParameter& param) GRPC_FINAL { @@ -63,7 +63,8 @@ class RpcMethodHandler : public MethodHandler { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); + ops.SendInitialMetadata(param.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); if (status.ok()) { status = ops.SendMessage(rsp); } @@ -87,7 +88,8 @@ class ClientStreamingHandler : public MethodHandler { public: ClientStreamingHandler( std::function<Status(ServiceType*, ServerContext*, - ServerReader<RequestType>*, ResponseType*)> func, + ServerReader<RequestType>*, ResponseType*)> + func, ServiceType* service) : func_(func), service_(service) {} @@ -100,7 +102,8 @@ class ClientStreamingHandler : public MethodHandler { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_); + ops.SendInitialMetadata(param.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); if (status.ok()) { status = ops.SendMessage(rsp); } @@ -122,7 +125,8 @@ class ServerStreamingHandler : public MethodHandler { public: ServerStreamingHandler( std::function<Status(ServiceType*, ServerContext*, const RequestType*, - ServerWriter<ResponseType>*)> func, + ServerWriter<ResponseType>*)> + func, ServiceType* service) : func_(func), service_(service) {} @@ -138,7 +142,8 @@ class ServerStreamingHandler : public MethodHandler { CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); + ops.SendInitialMetadata(param.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -170,7 +175,8 @@ class BidiStreamingHandler : public MethodHandler { CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); + ops.SendInitialMetadata(param.server_context->initial_metadata_, + param.server_context->initial_metadata_flags()); } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -191,7 +197,8 @@ class UnknownMethodHandler : public MethodHandler { static void FillOps(ServerContext* context, T* ops) { Status status(StatusCode::UNIMPLEMENTED, ""); if (!context->sent_initial_metadata_) { - ops->SendInitialMetadata(context->initial_metadata_); + ops->SendInitialMetadata(context->initial_metadata_, + context->initial_metadata_flags()); context->sent_initial_metadata_ = true; } ops->ServerSendStatus(context->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 44d340aa45..7fa0235ca9 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -195,6 +195,8 @@ class ServerContext { void set_call(grpc_call* call); + uint32_t initial_metadata_flags() const { return 0; } + CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index d0ad05169f..9100ce09a2 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -125,7 +125,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); ops.ClientSendClose(); @@ -190,7 +191,8 @@ class ClientWriter : public ClientWriterInterface<W> { finish_ops_.RecvMessage(response); CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } @@ -268,7 +270,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); } @@ -334,7 +337,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -361,7 +365,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -374,7 +379,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); @@ -397,7 +403,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -417,7 +424,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); |