aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/completion_queue.h9
-rw-r--r--include/grpc++/impl/call.h4
-rw-r--r--include/grpc++/server_context.h12
-rw-r--r--include/grpc++/stream.h12
4 files changed, 22 insertions, 15 deletions
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index f1b4962d1b..0bfbd5db7c 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -55,6 +55,7 @@ class ServerReaderWriter;
class CompletionQueue;
class Server;
+class ServerContext;
class CompletionQueueTag {
public:
@@ -62,7 +63,9 @@ class CompletionQueueTag {
// Called prior to returning from Next(), return value
// is the status of the operation (return status is the default thing
// to do)
- virtual void FinalizeResult(void **tag, bool *status) = 0;
+ // If this function returns false, the tag is dropped and not returned
+ // from the completion queue
+ virtual bool FinalizeResult(void **tag, bool *status) = 0;
};
// grpc_completion_queue wrapper class
@@ -99,6 +102,7 @@ class CompletionQueue {
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
friend class ::grpc::Server;
+ friend class ::grpc::ServerContext;
friend Status BlockingUnaryCall(ChannelInterface *channel,
const RpcMethod &method,
ClientContext *context,
@@ -109,6 +113,9 @@ class CompletionQueue {
// Cannot be mixed with calls to Next().
bool Pluck(CompletionQueueTag *tag);
+ // Does a single polling pluck on tag
+ void TryPluck(CompletionQueueTag *tag, bool forever);
+
grpc_completion_queue *cq_; // owned
};
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index 7ba5d16bf3..341710f7a2 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -65,7 +65,7 @@ class CallOpBuffer : public CompletionQueueTag {
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx);
- void AddRecvInitialMetadata(ClientContext* ctx);
+ void AddRecvInitialMetadata(ClientContext *ctx);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
@@ -80,7 +80,7 @@ class CallOpBuffer : public CompletionQueueTag {
void FillOps(grpc_op *ops, size_t *nops);
// Called by completion queue just prior to returning from Next() or Pluck()
- void FinalizeResult(void **tag, bool *status) override;
+ bool FinalizeResult(void **tag, bool *status) override;
bool got_message = false;
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 06744f8f4f..22ebeadbec 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -36,6 +36,7 @@
#include <chrono>
#include <map>
+#include <mutex>
#include "config.h"
@@ -60,7 +61,9 @@ class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
+class Call;
class CallOpBuffer;
+class CompletionQueue;
class Server;
// Interface of server side rpc context.
@@ -76,6 +79,8 @@ class ServerContext final {
void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
+ bool IsCancelled();
+
std::multimap<grpc::string, grpc::string> client_metadata() {
return client_metadata_;
}
@@ -97,11 +102,18 @@ class ServerContext final {
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
+ class CompletionOp;
+
+ void BeginCompletionOp(Call* call);
+
ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count);
+ CompletionOp* completion_op_ = nullptr;
+
std::chrono::system_clock::time_point deadline_;
grpc_call* call_ = nullptr;
+ CompletionQueue* cq_ = nullptr;
bool sent_initial_metadata_ = false;
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 491dfc8136..bb2a84949a 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -576,8 +576,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -589,8 +587,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -636,8 +632,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -649,8 +643,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -697,8 +689,6 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@@ -753,8 +743,6 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- bool cancelled = false;
- finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}