aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 19:24:30 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 19:24:30 -0700
commita094b7b3127ffcb607e11486a64fc905e92a2565 (patch)
treeb06a812825433b90c4818468677503f57b9776eb /src/cpp/server
parent281de1bb3003e51d4b59445827b25a23b33ba509 (diff)
Added server async tests
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server_cc.cc20
1 files changed, 14 insertions, 6 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 9f4ec3e4ab..2f5493dbf8 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -41,8 +41,10 @@
#include <grpcpp/support/time.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/completion_queue.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
@@ -753,6 +755,7 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
/* Set up interception state partially for the receive ops. call_wrapper_ is
* not filled at this point, but it will be filled before the interceptors are
* run. */
+ gpr_log(GPR_ERROR, "Created base async request");
interceptor_methods_.SetCall(&call_wrapper_);
interceptor_methods_.SetReverse();
call_cq_->RegisterAvalanching(); // This op will trigger more ops
@@ -764,9 +767,9 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
+ gpr_log(GPR_ERROR, "in finalize result");
if (done_intercepting_) {
- delete static_cast<Alarm*>(dummy_alarm_);
- dummy_alarm_ = nullptr;
+ gpr_log(GPR_ERROR, "done running interceptors");
*tag = tag_;
if (delete_on_finalize_) {
delete this;
@@ -785,6 +788,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
stream_->BindCall(&call_wrapper_);
if (*status && call_ && call_wrapper_.server_rpc_info()) {
+ gpr_log(GPR_ERROR, "here");
done_intercepting_ = true;
// Set interception point for RECV INITIAL METADATA
interceptor_methods_.AddInterceptionHookPoint(
@@ -799,6 +803,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
// There were interceptors to be run, so
// ContinueFinalizeResultAfterInterception will be run when interceptors
// are done.
+ gpr_log(GPR_ERROR, "don't return this tag");
return false;
}
}
@@ -814,12 +819,15 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
+ gpr_log(GPR_ERROR, "continue finalize result");
context_->BeginCompletionOp(&call_wrapper_);
// Queue a tag which will be returned immediately
- dummy_alarm_ = new Alarm();
- static_cast<Alarm*>(dummy_alarm_)
- ->Set(notification_cq_,
- g_core_codegen_interface->gpr_time_0(GPR_CLOCK_MONOTONIC), this);
+ grpc_core::ExecCtx exec_ctx;
+ grpc_cq_begin_op(notification_cq_->cq(), this);
+ grpc_cq_end_op(
+ notification_cq_->cq(), this, GRPC_ERROR_NONE,
+ [](void* arg, grpc_cq_completion* completion) { delete completion; },
+ nullptr, new grpc_cq_completion());
}
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(