/* * * Copyright 2014, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "src/cpp/stream/stream_context.h" #include #include #include "src/cpp/rpc_method.h" #include "src/cpp/proto/proto_utils.h" #include "src/cpp/util/time.h" #include #include #include namespace grpc { // Client only ctor StreamContext::StreamContext(const RpcMethod& method, ClientContext* context, const google::protobuf::Message* request, google::protobuf::Message* result) : is_client_(true), method_(&method), context_(context), request_(request), result_(result), invoke_ev_(nullptr), read_ev_(nullptr), write_ev_(nullptr), reading_(false), writing_(false), got_read_(false), got_write_(false), peer_halfclosed_(false), self_halfclosed_(false), stream_finished_(false), waiting_(false) { GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); } StreamContext::~StreamContext() { cq_poller_.join(); } void StreamContext::PollingLoop() { grpc_event* ev = nullptr; gpr_timespec absolute_deadline; AbsoluteDeadlineTimepoint2Timespec(context_->absolute_deadline(), &absolute_deadline); std::condition_variable* cv_to_notify = nullptr; std::unique_lock lock(mu_, std::defer_lock); while (1) { cv_to_notify = nullptr; lock.lock(); if (stream_finished_ && !reading_ && !writing_) { return; } lock.unlock(); ev = grpc_completion_queue_next(context_->cq(), absolute_deadline); lock.lock(); if (!ev) { stream_finished_ = true; final_status_ = Status(StatusCode::DEADLINE_EXCEEDED); std::condition_variable* cvs[3] = {reading_ ? &read_cv_ : nullptr, writing_ ? &write_cv_ : nullptr, waiting_ ? &finish_cv_ : nullptr}; got_read_ = reading_; got_write_ = writing_; read_ev_ = nullptr; write_ev_ = nullptr; lock.unlock(); for (int i = 0; i < 3; i++) { if (cvs[i]) cvs[i]->notify_one(); } break; } switch (ev->type) { case GRPC_READ: GPR_ASSERT(reading_); got_read_ = true; read_ev_ = ev; cv_to_notify = &read_cv_; reading_ = false; break; case GRPC_FINISH_ACCEPTED: case GRPC_WRITE_ACCEPTED: got_write_ = true; write_ev_ = ev; cv_to_notify = &write_cv_; writing_ = false; break; case GRPC_FINISHED: { grpc::string error_details( ev->data.finished.details ? ev->data.finished.details : ""); final_status_ = Status(static_cast(ev->data.finished.code), error_details); grpc_event_finish(ev); stream_finished_ = true; if (waiting_) { cv_to_notify = &finish_cv_; } break; } case GRPC_INVOKE_ACCEPTED: invoke_ev_ = ev; cv_to_notify = &invoke_cv_; break; case GRPC_CLIENT_METADATA_READ: grpc_event_finish(ev); break; default: grpc_event_finish(ev); // not handling other types now gpr_log(GPR_ERROR, "unknown event type"); abort(); } lock.unlock(); if (cv_to_notify) { cv_to_notify->notify_one(); } } } void StreamContext::Start(bool buffered) { // TODO(yangg) handle metadata send path int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; grpc_call_error error = grpc_call_start_invoke( context_->call(), context_->cq(), this, this, this, flag); GPR_ASSERT(GRPC_CALL_OK == error); // kicks off the poller thread cq_poller_ = std::thread(&StreamContext::PollingLoop, this); std::unique_lock lock(mu_); while (!invoke_ev_) { invoke_cv_.wait(lock); } lock.unlock(); GPR_ASSERT(invoke_ev_->data.invoke_accepted == GRPC_OP_OK); grpc_event_finish(invoke_ev_); } namespace { // Wait for got_event with event_cv protected by mu, return event. grpc_event* WaitForEvent(bool* got_event, std::condition_variable* event_cv, std::mutex* mu, grpc_event** event) { std::unique_lock lock(*mu); while (*got_event == false) { event_cv->wait(lock); } *got_event = false; return *event; } } // namespace bool StreamContext::Read(google::protobuf::Message* msg) { std::unique_lock lock(mu_); if (stream_finished_) { peer_halfclosed_ = true; return false; } reading_ = true; lock.unlock(); grpc_call_error err = grpc_call_start_read(context_->call(), this); GPR_ASSERT(err == GRPC_CALL_OK); grpc_event* ev = WaitForEvent(&got_read_, &read_cv_, &mu_, &read_ev_); if (!ev) { return false; } GPR_ASSERT(ev->type == GRPC_READ); bool ret = true; if (ev->data.read) { if (!DeserializeProto(ev->data.read, msg)) { ret = false; // parse error // TODO(yangg) cancel the stream. } } else { ret = false; peer_halfclosed_ = true; } grpc_event_finish(ev); return ret; } bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { bool ret = true; grpc_event* ev = nullptr; std::unique_lock lock(mu_); if (stream_finished_) { self_halfclosed_ = true; return false; } writing_ = true; lock.unlock(); if (msg) { grpc_byte_buffer* out_buf = nullptr; if (!SerializeProto(*msg, &out_buf)) { FinishStream(Status(StatusCode::INVALID_ARGUMENT, "Failed to serialize request proto"), true); return false; } int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; grpc_call_error err = grpc_call_start_write(context_->call(), out_buf, this, flag); grpc_byte_buffer_destroy(out_buf); GPR_ASSERT(err == GRPC_CALL_OK); ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_); if (!ev) { return false; } GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED); ret = ev->data.write_accepted == GRPC_OP_OK; grpc_event_finish(ev); } if (is_last) { grpc_call_error err = grpc_call_writes_done(context_->call(), this); GPR_ASSERT(err == GRPC_CALL_OK); ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_); if (!ev) { return false; } GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED); grpc_event_finish(ev); self_halfclosed_ = true; } return ret; } const Status& StreamContext::Wait() { std::unique_lock lock(mu_); // TODO(yangg) if not halfclosed cancel the stream GPR_ASSERT(self_halfclosed_); GPR_ASSERT(peer_halfclosed_); GPR_ASSERT(!waiting_); waiting_ = true; while (!stream_finished_) { finish_cv_.wait(lock); } return final_status_; } void StreamContext::FinishStream(const Status& status, bool send) { return; } } // namespace grpc