diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-10-25 19:24:30 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-10-25 19:24:30 -0700 |
commit | a094b7b3127ffcb607e11486a64fc905e92a2565 (patch) | |
tree | b06a812825433b90c4818468677503f57b9776eb /src/cpp/server/server_cc.cc | |
parent | 281de1bb3003e51d4b59445827b25a23b33ba509 (diff) |
Added server async tests
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 9f4ec3e4ab..2f5493dbf8 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -41,8 +41,10 @@ #include <grpcpp/support/time.h> #include "src/core/ext/transport/inproc/inproc_transport.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/cpp/client/create_channel_internal.h" #include "src/cpp/server/health/default_health_check_service.h" #include "src/cpp/thread_manager/thread_manager.h" @@ -753,6 +755,7 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( /* Set up interception state partially for the receive ops. call_wrapper_ is * not filled at this point, but it will be filled before the interceptors are * run. */ + gpr_log(GPR_ERROR, "Created base async request"); interceptor_methods_.SetCall(&call_wrapper_); interceptor_methods_.SetReverse(); call_cq_->RegisterAvalanching(); // This op will trigger more ops @@ -764,9 +767,9 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { + gpr_log(GPR_ERROR, "in finalize result"); if (done_intercepting_) { - delete static_cast<Alarm*>(dummy_alarm_); - dummy_alarm_ = nullptr; + gpr_log(GPR_ERROR, "done running interceptors"); *tag = tag_; if (delete_on_finalize_) { delete this; @@ -785,6 +788,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, stream_->BindCall(&call_wrapper_); if (*status && call_ && call_wrapper_.server_rpc_info()) { + gpr_log(GPR_ERROR, "here"); done_intercepting_ = true; // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( @@ -799,6 +803,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, // There were interceptors to be run, so // ContinueFinalizeResultAfterInterception will be run when interceptors // are done. + gpr_log(GPR_ERROR, "don't return this tag"); return false; } } @@ -814,12 +819,15 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { + gpr_log(GPR_ERROR, "continue finalize result"); context_->BeginCompletionOp(&call_wrapper_); // Queue a tag which will be returned immediately - dummy_alarm_ = new Alarm(); - static_cast<Alarm*>(dummy_alarm_) - ->Set(notification_cq_, - g_core_codegen_interface->gpr_time_0(GPR_CLOCK_MONOTONIC), this); + grpc_core::ExecCtx exec_ctx; + grpc_cq_begin_op(notification_cq_->cq(), this); + grpc_cq_end_op( + notification_cq_->cq(), this, GRPC_ERROR_NONE, + [](void* arg, grpc_cq_completion* completion) { delete completion; }, + nullptr, new grpc_cq_completion()); } ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( |