diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/completion_queue.h | 9 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 4 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 12 | ||||
-rw-r--r-- | include/grpc++/stream.h | 12 |
4 files changed, 22 insertions, 15 deletions
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index f1b4962d1b..0bfbd5db7c 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -55,6 +55,7 @@ class ServerReaderWriter; class CompletionQueue; class Server; +class ServerContext; class CompletionQueueTag { public: @@ -62,7 +63,9 @@ class CompletionQueueTag { // Called prior to returning from Next(), return value // is the status of the operation (return status is the default thing // to do) - virtual void FinalizeResult(void **tag, bool *status) = 0; + // If this function returns false, the tag is dropped and not returned + // from the completion queue + virtual bool FinalizeResult(void **tag, bool *status) = 0; }; // grpc_completion_queue wrapper class @@ -99,6 +102,7 @@ class CompletionQueue { template <class R, class W> friend class ::grpc::ServerReaderWriter; friend class ::grpc::Server; + friend class ::grpc::ServerContext; friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, @@ -109,6 +113,9 @@ class CompletionQueue { // Cannot be mixed with calls to Next(). bool Pluck(CompletionQueueTag *tag); + // Does a single polling pluck on tag + void TryPluck(CompletionQueueTag *tag, bool forever); + grpc_completion_queue *cq_; // owned }; diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 7ba5d16bf3..341710f7a2 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -65,7 +65,7 @@ class CallOpBuffer : public CompletionQueueTag { void AddSendInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); void AddSendInitialMetadata(ClientContext *ctx); - void AddRecvInitialMetadata(ClientContext* ctx); + void AddRecvInitialMetadata(ClientContext *ctx); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); @@ -80,7 +80,7 @@ class CallOpBuffer : public CompletionQueueTag { void FillOps(grpc_op *ops, size_t *nops); // Called by completion queue just prior to returning from Next() or Pluck() - void FinalizeResult(void **tag, bool *status) override; + bool FinalizeResult(void **tag, bool *status) override; bool got_message = false; diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 06744f8f4f..22ebeadbec 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -36,6 +36,7 @@ #include <chrono> #include <map> +#include <mutex> #include "config.h" @@ -60,7 +61,9 @@ class ServerWriter; template <class R, class W> class ServerReaderWriter; +class Call; class CallOpBuffer; +class CompletionQueue; class Server; // Interface of server side rpc context. @@ -76,6 +79,8 @@ class ServerContext final { void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); + bool IsCancelled(); + std::multimap<grpc::string, grpc::string> client_metadata() { return client_metadata_; } @@ -97,11 +102,18 @@ class ServerContext final { template <class R, class W> friend class ::grpc::ServerReaderWriter; + class CompletionOp; + + void BeginCompletionOp(Call* call); + ServerContext(gpr_timespec deadline, grpc_metadata* metadata, size_t metadata_count); + CompletionOp* completion_op_ = nullptr; + std::chrono::system_clock::time_point deadline_; grpc_call* call_ = nullptr; + CompletionQueue* cq_ = nullptr; bool sent_initial_metadata_ = false; std::multimap<grpc::string, grpc::string> client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 491dfc8136..bb2a84949a 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -576,8 +576,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { if (status.IsOk()) { finish_buf_.AddSendMessage(msg); } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -589,8 +587,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -636,8 +632,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface, if (status.IsOk()) { finish_buf_.AddSendMessage(msg); } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -649,8 +643,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface, finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -697,8 +689,6 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface, finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -753,8 +743,6 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } |