aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/stream/stream_context.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/stream/stream_context.cc')
-rw-r--r--src/cpp/stream/stream_context.cc252
1 files changed, 85 insertions, 167 deletions
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index 07e771f7e1..706e90c481 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -33,7 +33,6 @@
#include "src/cpp/stream/stream_context.h"
-#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/cpp/rpc_method.h"
#include "src/cpp/proto/proto_utils.h"
@@ -50,227 +49,146 @@ StreamContext::StreamContext(const RpcMethod& method, ClientContext* context,
google::protobuf::Message* result)
: is_client_(true),
method_(&method),
- context_(context),
- request_(request),
+ call_(context->call()),
+ cq_(context->cq()),
+ request_(const_cast<google::protobuf::Message*>(request)),
result_(result),
- invoke_ev_(nullptr),
- read_ev_(nullptr),
- write_ev_(nullptr),
- reading_(false),
- writing_(false),
- got_read_(false),
- got_write_(false),
peer_halfclosed_(false),
- self_halfclosed_(false),
- stream_finished_(false),
- waiting_(false) {
+ self_halfclosed_(false) {
GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC);
}
-StreamContext::~StreamContext() { cq_poller_.join(); }
-
-void StreamContext::PollingLoop() {
- grpc_event* ev = nullptr;
- gpr_timespec absolute_deadline;
- AbsoluteDeadlineTimepoint2Timespec(context_->absolute_deadline(),
- &absolute_deadline);
- std::condition_variable* cv_to_notify = nullptr;
- std::unique_lock<std::mutex> lock(mu_, std::defer_lock);
- while (1) {
- cv_to_notify = nullptr;
- lock.lock();
- if (stream_finished_ && !reading_ && !writing_) {
- return;
- }
- lock.unlock();
- ev = grpc_completion_queue_next(context_->cq(), absolute_deadline);
- lock.lock();
- if (!ev) {
- stream_finished_ = true;
- final_status_ = Status(StatusCode::DEADLINE_EXCEEDED);
- std::condition_variable* cvs[3] = {reading_ ? &read_cv_ : nullptr,
- writing_ ? &write_cv_ : nullptr,
- waiting_ ? &finish_cv_ : nullptr};
- got_read_ = reading_;
- got_write_ = writing_;
- read_ev_ = nullptr;
- write_ev_ = nullptr;
- lock.unlock();
- for (int i = 0; i < 3; i++) {
- if (cvs[i]) cvs[i]->notify_one();
- }
- break;
- }
- switch (ev->type) {
- case GRPC_READ:
- GPR_ASSERT(reading_);
- got_read_ = true;
- read_ev_ = ev;
- cv_to_notify = &read_cv_;
- reading_ = false;
- break;
- case GRPC_FINISH_ACCEPTED:
- case GRPC_WRITE_ACCEPTED:
- got_write_ = true;
- write_ev_ = ev;
- cv_to_notify = &write_cv_;
- writing_ = false;
- break;
- case GRPC_FINISHED: {
- grpc::string error_details(
- ev->data.finished.details ? ev->data.finished.details : "");
- final_status_ = Status(static_cast<StatusCode>(ev->data.finished.code),
- error_details);
- grpc_event_finish(ev);
- stream_finished_ = true;
- if (waiting_) {
- cv_to_notify = &finish_cv_;
- }
- break;
- }
- case GRPC_INVOKE_ACCEPTED:
- invoke_ev_ = ev;
- cv_to_notify = &invoke_cv_;
- break;
- case GRPC_CLIENT_METADATA_READ:
- grpc_event_finish(ev);
- break;
- default:
- grpc_event_finish(ev);
- // not handling other types now
- gpr_log(GPR_ERROR, "unknown event type");
- abort();
- }
- lock.unlock();
- if (cv_to_notify) {
- cv_to_notify->notify_one();
- }
- }
+// Server only ctor
+StreamContext::StreamContext(const RpcMethod& method, grpc_call* call,
+ grpc_completion_queue* cq,
+ google::protobuf::Message* request, google::protobuf::Message* result)
+ : is_client_(false),
+ method_(&method),
+ call_(call),
+ cq_(cq),
+ request_(request),
+ result_(result),
+ peer_halfclosed_(false),
+ self_halfclosed_(false) {
+ GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC);
}
-void StreamContext::Start(bool buffered) {
- // TODO(yangg) handle metadata send path
- int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error error = grpc_call_start_invoke(
- context_->call(), context_->cq(), this, this, this, flag);
- GPR_ASSERT(GRPC_CALL_OK == error);
- // kicks off the poller thread
- cq_poller_ = std::thread(&StreamContext::PollingLoop, this);
- std::unique_lock<std::mutex> lock(mu_);
- while (!invoke_ev_) {
- invoke_cv_.wait(lock);
- }
- lock.unlock();
- GPR_ASSERT(invoke_ev_->data.invoke_accepted == GRPC_OP_OK);
- grpc_event_finish(invoke_ev_);
-}
+StreamContext::~StreamContext() {}
-namespace {
-// Wait for got_event with event_cv protected by mu, return event.
-grpc_event* WaitForEvent(bool* got_event, std::condition_variable* event_cv,
- std::mutex* mu, grpc_event** event) {
- std::unique_lock<std::mutex> lock(*mu);
- while (*got_event == false) {
- event_cv->wait(lock);
+void StreamContext::Start(bool buffered) {
+ if (is_client_) {
+ // TODO(yangg) handle metadata send path
+ int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
+ grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(),
+ client_metadata_read_tag(),
+ finished_tag(), flag);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ grpc_event* invoke_ev =
+ grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future);
+ grpc_event_finish(invoke_ev);
+ } else {
+ // TODO(yangg) metadata needs to be added before accept
+ // TODO(yangg) correctly set flag to accept
+ grpc_call_error error = grpc_call_accept(call(), cq(), finished_tag(), 0);
+ GPR_ASSERT(GRPC_CALL_OK == error);
}
- *got_event = false;
- return *event;
}
-} // namespace
bool StreamContext::Read(google::protobuf::Message* msg) {
- std::unique_lock<std::mutex> lock(mu_);
- if (stream_finished_) {
- peer_halfclosed_ = true;
- return false;
- }
- reading_ = true;
- lock.unlock();
-
- grpc_call_error err = grpc_call_start_read(context_->call(), this);
+ // TODO(yangg) check peer_halfclosed_ here for possible early return.
+ grpc_call_error err = grpc_call_start_read(call(), read_tag());
GPR_ASSERT(err == GRPC_CALL_OK);
-
- grpc_event* ev = WaitForEvent(&got_read_, &read_cv_, &mu_, &read_ev_);
- if (!ev) {
- return false;
- }
- GPR_ASSERT(ev->type == GRPC_READ);
+ grpc_event* read_ev =
+ grpc_completion_queue_pluck(cq(), read_tag(), gpr_inf_future);
+ GPR_ASSERT(read_ev->type == GRPC_READ);
bool ret = true;
- if (ev->data.read) {
- if (!DeserializeProto(ev->data.read, msg)) {
- ret = false; // parse error
- // TODO(yangg) cancel the stream.
+ if (read_ev->data.read) {
+ if (!DeserializeProto(read_ev->data.read, msg)) {
+ ret = false;
+ FinishStream(
+ Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"),
+ true);
}
} else {
ret = false;
peer_halfclosed_ = true;
}
- grpc_event_finish(ev);
+ grpc_event_finish(read_ev);
return ret;
}
bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) {
+ // TODO(yangg) check self_halfclosed_ for possible early return.
bool ret = true;
grpc_event* ev = nullptr;
- std::unique_lock<std::mutex> lock(mu_);
- if (stream_finished_) {
- self_halfclosed_ = true;
- return false;
- }
- writing_ = true;
- lock.unlock();
-
if (msg) {
grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) {
FinishStream(Status(StatusCode::INVALID_ARGUMENT,
- "Failed to serialize request proto"),
+ "Failed to serialize outgoing proto"),
true);
return false;
}
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
grpc_call_error err =
- grpc_call_start_write(context_->call(), out_buf, this, flag);
+ grpc_call_start_write(call(), out_buf, write_tag(), flag);
grpc_byte_buffer_destroy(out_buf);
GPR_ASSERT(err == GRPC_CALL_OK);
- ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_);
- if (!ev) {
- return false;
- }
+ ev = grpc_completion_queue_pluck(cq(), write_tag(), gpr_inf_future);
GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED);
ret = ev->data.write_accepted == GRPC_OP_OK;
grpc_event_finish(ev);
}
- if (is_last) {
- grpc_call_error err = grpc_call_writes_done(context_->call(), this);
+ if (ret && is_last) {
+ grpc_call_error err = grpc_call_writes_done(call(), halfclose_tag());
GPR_ASSERT(err == GRPC_CALL_OK);
- ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_);
- if (!ev) {
- return false;
- }
+ ev = grpc_completion_queue_pluck(cq(), halfclose_tag(), gpr_inf_future);
GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED);
grpc_event_finish(ev);
+
self_halfclosed_ = true;
+ } else if (!ret) { // Stream broken
+ self_halfclosed_ = true;
+ peer_halfclosed_ = true;
}
+
return ret;
}
const Status& StreamContext::Wait() {
- std::unique_lock<std::mutex> lock(mu_);
- // TODO(yangg) if not halfclosed cancel the stream
- GPR_ASSERT(self_halfclosed_);
- GPR_ASSERT(peer_halfclosed_);
- GPR_ASSERT(!waiting_);
- waiting_ = true;
- while (!stream_finished_) {
- finish_cv_.wait(lock);
+ // TODO(yangg) properly support metadata
+ grpc_event* metadata_ev = grpc_completion_queue_pluck(
+ cq(), client_metadata_read_tag(), gpr_inf_future);
+ grpc_event_finish(metadata_ev);
+ // TODO(yangg) protect states by a mutex, including other places.
+ if (!self_halfclosed_ || !peer_halfclosed_) {
+ FinishStream(Status::Cancelled, true);
+ } else {
+ grpc_event* finish_ev =
+ grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
+ GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
+ std::string error_details(finish_ev->data.finished.details
+ ? finish_ev->data.finished.details
+ : "");
+ final_status_ = Status(
+ static_cast<StatusCode>(finish_ev->data.finished.code), error_details);
+ grpc_event_finish(finish_ev);
}
return final_status_;
}
-void StreamContext::FinishStream(const Status& status, bool send) { return; }
+void StreamContext::FinishStream(const Status& status, bool send) {
+ if (send) {
+ grpc_call_cancel(call());
+ }
+ grpc_event* finish_ev =
+ grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
+ GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
+ grpc_event_finish(finish_ev);
+ final_status_ = status;
+}
} // namespace grpc