diff options
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/README.md | 4 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 16 | ||||
-rw-r--r-- | include/grpc++/server.h | 28 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 4 | ||||
-rw-r--r-- | include/grpc++/stream.h | 2 |
6 files changed, 42 insertions, 15 deletions
diff --git a/include/grpc++/impl/README.md b/include/grpc++/impl/README.md new file mode 100644 index 0000000000..612150caa0 --- /dev/null +++ b/include/grpc++/impl/README.md @@ -0,0 +1,4 @@ +**The APIs in this directory are not stable!** + +This directory contains header files that need to be installed but are not part +of the public API. Users should not use these headers directly. diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index bc1db4c12c..35338a413e 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -541,8 +541,7 @@ class CallOpSet : public CallOpSetInterface, template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> -class SneakyCallOpSet GRPC_FINAL - : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { +class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 925801e1ce..c02ebec19e 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -211,13 +211,19 @@ class BidiStreamingHandler : public MethodHandler { // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { public: - void RunHandler(const HandlerParameter& param) GRPC_FINAL { + template <class T> + static void FillOps(ServerContext* context, T* ops) { Status status(StatusCode::UNIMPLEMENTED, ""); - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; - if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (!context->sent_initial_metadata_) { + ops->SendInitialMetadata(context->initial_metadata_); + context->sent_initial_metadata_ = true; } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops->ServerSendStatus(context->trailing_metadata_, status); + } + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 99f288d179..3cff07fb80 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -63,7 +63,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ~Server(); // Shutdown the server, block until all rpc processing finishes. - void Shutdown(); + // Forcefully terminate pending calls after deadline expires. + template <class T> + void Shutdown(const T& deadline) { + ShutdownInternal(TimePoint<T>(deadline).raw_time()); + } + + // Shutdown the server, waiting for all rpc processing to finish. + void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } // Block waiting for all work to complete (the server must either // be shutting down or some other thread must call Shutdown for this @@ -91,7 +98,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { // Add a listening port. Can be called multiple times. int AddListeningPort(const grpc::string& addr, ServerCredentials* creds); // Start the server. - bool Start(); + bool Start(ServerCompletionQueue** cqs, size_t num_cqs); void HandleQueueClosed(); void RunRpc(); @@ -99,11 +106,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; + void ShutdownInternal(gpr_timespec deadline); + class BaseAsyncRequest : public CompletionQueueTag { public: BaseAsyncRequest(Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, void* tag); + CompletionQueue* call_cq, void* tag, + bool delete_on_finalize); virtual ~BaseAsyncRequest(); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -114,6 +124,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; + const bool delete_on_finalize_; grpc_call* call_; grpc_metadata_array initial_metadata_array_; }; @@ -175,12 +186,13 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { Message* const request_; }; - class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest { + class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag); + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; @@ -188,6 +200,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { grpc_call_details call_details_; }; + class UnimplementedAsyncRequestContext; + class UnimplementedAsyncRequest; + class UnimplementedAsyncResponse; + template <class Message> void RequestAsyncCall(void* registered_method, ServerContext* context, ServerAsyncStreamingInterface* stream, @@ -212,7 +228,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { ServerCompletionQueue* notification_cq, void* tag) { new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, - tag); + tag, true); } const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 906daf1370..8666252d51 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -101,8 +101,8 @@ class ServerBuilder { void SetThreadPool(ThreadPoolInterface* thread_pool); // Add a completion queue for handling asynchronous services - // Caller is required to keep this completion queue live until calling - // BuildAndStart() + // Caller is required to keep this completion queue live until + // the server is destroyed. std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(); // Return a running server which is ready for processing rpcs. diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 45dafcd282..4bffaffb40 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -761,6 +761,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } private: + friend class ::grpc::Server; + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } Call call_; |