diff options
author | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2014-11-26 16:33:03 -0800 |
commit | b7ebd3b8c6fe39f99c40b10c1b563e4adb607b6c (patch) | |
tree | c1decf819492d455ec81cd471942c5516138f825 /src/cpp | |
parent | 0e905e63db21bcdd85d3d1af051fcdc5bb5caa38 (diff) |
Initial import.
Diffstat (limited to 'src/cpp')
26 files changed, 2338 insertions, 0 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc new file mode 100644 index 0000000000..7a7529104f --- /dev/null +++ b/src/cpp/client/channel.cc @@ -0,0 +1,212 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/client/channel.h" + +#include <chrono> +#include <string> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/time.h> + +#include "src/cpp/rpc_method.h" +#include "src/cpp/proto/proto_utils.h" +#include "src/cpp/stream/stream_context.h" +#include "src/cpp/util/time.h" +#include <grpc++/config.h> +#include <google/protobuf/message.h> +#include <grpc++/client_context.h> +#include <grpc++/status.h> + +namespace grpc { + +Channel::Channel(const grpc::string& target) : target_(target) { + c_channel_ = grpc_channel_create(target_.c_str(), nullptr); +} + +Channel::~Channel() { grpc_channel_destroy(c_channel_); } + +namespace { +// Poll one event from the compeletion queue. Return false when an error +// occured or the polled type is not expected. If a finished event has been +// polled, set finished and set status if it has not been set. +bool NextEvent(grpc_completion_queue* cq, grpc_completion_type expected_type, + bool* finished, bool* status_set, Status* status, + google::protobuf::Message* result) { + // We rely on the c layer to enforce deadline and thus do not use deadline + // here. + grpc_event* ev = grpc_completion_queue_next(cq, gpr_inf_future); + if (!ev) { + return false; + } + bool ret = ev->type == expected_type; + switch (ev->type) { + case GRPC_INVOKE_ACCEPTED: + ret = ret && (ev->data.invoke_accepted == GRPC_OP_OK); + break; + case GRPC_READ: + ret = ret && (ev->data.read != nullptr); + if (ret && !DeserializeProto(ev->data.read, result)) { + *status_set = true; + *status = + Status(StatusCode::DATA_LOSS, "Failed to parse response proto."); + ret = false; + } + break; + case GRPC_WRITE_ACCEPTED: + ret = ret && (ev->data.write_accepted == GRPC_OP_OK); + break; + case GRPC_FINISH_ACCEPTED: + ret = ret && (ev->data.finish_accepted == GRPC_OP_OK); + break; + case GRPC_CLIENT_METADATA_READ: + break; + case GRPC_FINISHED: + *finished = true; + if (!*status_set) { + *status_set = true; + StatusCode error_code = static_cast<StatusCode>(ev->data.finished.code); + grpc::string details( + ev->data.finished.details ? ev->data.finished.details : ""); + *status = Status(error_code, details); + } + break; + default: + gpr_log(GPR_ERROR, "Dropping unhandled event with type %d", ev->type); + break; + } + grpc_event_finish(ev); + return ret; +} + +// If finished is not true, get final status by polling until a finished +// event is obtained. +void GetFinalStatus(grpc_completion_queue* cq, bool status_set, bool finished, + Status* status) { + while (!finished) { + NextEvent(cq, GRPC_FINISHED, &finished, &status_set, status, nullptr); + } +} + +} // namespace + +// TODO(yangg) more error handling +Status Channel::StartBlockingRpc(const RpcMethod& method, + ClientContext* context, + const google::protobuf::Message& request, + google::protobuf::Message* result) { + Status status; + bool status_set = false; + bool finished = false; + gpr_timespec absolute_deadline; + AbsoluteDeadlineTimepoint2Timespec(context->absolute_deadline(), + &absolute_deadline); + grpc_call* call = grpc_channel_create_call(c_channel_, method.name(), + // FIXME(yangg) + "localhost", absolute_deadline); + context->set_call(call); + grpc_completion_queue* cq = grpc_completion_queue_create(); + context->set_cq(cq); + // add_metadata from context + // + // invoke + GPR_ASSERT(grpc_call_start_invoke(call, cq, call, call, call, + GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); + if (!NextEvent(cq, GRPC_INVOKE_ACCEPTED, &status_set, &finished, &status, + nullptr)) { + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + // write request + grpc_byte_buffer* write_buffer = nullptr; + bool success = SerializeProto(request, &write_buffer); + if (!success) { + grpc_call_cancel(call); + status_set = true; + status = + Status(StatusCode::DATA_LOSS, "Failed to serialize request proto."); + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + GPR_ASSERT(grpc_call_start_write(call, write_buffer, call, + GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); + grpc_byte_buffer_destroy(write_buffer); + if (!NextEvent(cq, GRPC_WRITE_ACCEPTED, &finished, &status_set, &status, + nullptr)) { + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + // writes done + GPR_ASSERT(grpc_call_writes_done(call, call) == GRPC_CALL_OK); + if (!NextEvent(cq, GRPC_FINISH_ACCEPTED, &finished, &status_set, &status, + nullptr)) { + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + // start read metadata + // + if (!NextEvent(cq, GRPC_CLIENT_METADATA_READ, &finished, &status_set, &status, + nullptr)) { + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + // start read + GPR_ASSERT(grpc_call_start_read(call, call) == GRPC_CALL_OK); + if (!NextEvent(cq, GRPC_READ, &finished, &status_set, &status, result)) { + GetFinalStatus(cq, finished, status_set, &status); + return status; + } + // wait status + GetFinalStatus(cq, finished, status_set, &status); + return status; +} + +StreamContextInterface* Channel::CreateStream(const RpcMethod& method, + ClientContext* context, + const google::protobuf::Message* request, + google::protobuf::Message* result) { + gpr_timespec absolute_deadline; + AbsoluteDeadlineTimepoint2Timespec(context->absolute_deadline(), + &absolute_deadline); + grpc_call* call = grpc_channel_create_call(c_channel_, method.name(), + // FIXME(yangg) + "localhost", absolute_deadline); + context->set_call(call); + grpc_completion_queue* cq = grpc_completion_queue_create(); + context->set_cq(cq); + return new StreamContext(method, context, request, result); +} + +} // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h new file mode 100644 index 0000000000..a97d35efe8 --- /dev/null +++ b/src/cpp/client/channel.h @@ -0,0 +1,71 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_CLIENT_CHANNEL_H__ +#define __GRPCPP_INTERNAL_CLIENT_CHANNEL_H__ + +#include <grpc++/channel_interface.h> +#include <grpc++/config.h> + +struct grpc_channel; + +namespace grpc { +class StreamContextInterface; + +class Channel : public ChannelInterface { + public: + explicit Channel(const grpc::string& target); + ~Channel() override; + + Status StartBlockingRpc(const RpcMethod& method, ClientContext* context, + const google::protobuf::Message& request, + google::protobuf::Message* result) override; + + StreamContextInterface* CreateStream(const RpcMethod& method, + ClientContext* context, + const google::protobuf::Message* request, + google::protobuf::Message* result) override; + + protected: + // TODO(yangg) remove this section when we have the general ssl channel API + Channel() {} + void set_c_channel(grpc_channel* channel) { c_channel_ = channel; } + + private: + const grpc::string target_; + grpc_channel* c_channel_; // owned +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_CLIENT_CHANNEL_H__ diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc new file mode 100644 index 0000000000..78774a7f12 --- /dev/null +++ b/src/cpp/client/client_context.cc @@ -0,0 +1,73 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/client_context.h> + +#include <grpc/grpc.h> + +using std::chrono::system_clock; + +namespace grpc { + +ClientContext::ClientContext() + : call_(nullptr), + cq_(nullptr), + absolute_deadline_(system_clock::time_point::max()) {} + +ClientContext::~ClientContext() { + if (call_) { + grpc_call_destroy(call_); + } + if (cq_) { + grpc_completion_queue_shutdown(cq_); + grpc_completion_queue_destroy(cq_); + } +} + +void ClientContext::set_absolute_deadline( + const system_clock::time_point& deadline) { + absolute_deadline_ = deadline; +} + +system_clock::time_point ClientContext::absolute_deadline() { + return absolute_deadline_; +} + +void ClientContext::AddMetadata(const grpc::string& meta_key, + const grpc::string& meta_value) { + return; +} + +void ClientContext::StartCancel() {} + +} // namespace grpc diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc new file mode 100644 index 0000000000..ea2b6a7ba2 --- /dev/null +++ b/src/cpp/client/create_channel.cc @@ -0,0 +1,47 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <memory> +#include <string> + +#include "src/cpp/client/channel.h" +#include <grpc++/channel_interface.h> +#include <grpc++/create_channel.h> + +namespace grpc { + +std::shared_ptr<ChannelInterface> CreateChannel(const grpc::string& target) { + return std::shared_ptr<ChannelInterface>(new Channel(target)); +} + +} // namespace grpc diff --git a/src/cpp/client/credentials.cc b/src/cpp/client/credentials.cc new file mode 100644 index 0000000000..18fa4fa656 --- /dev/null +++ b/src/cpp/client/credentials.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +#include <string> + +#include <grpc/grpc_security.h> + +#include <grpc++/credentials.h> + +namespace grpc { + +Credentials::Credentials(grpc_credentials* c_creds) : creds_(c_creds) {} + +Credentials::~Credentials() { grpc_credentials_release(creds_); } +grpc_credentials* Credentials::GetRawCreds() { return creds_; } + +std::unique_ptr<Credentials> CredentialsFactory::DefaultCredentials() { + grpc_credentials* c_creds = grpc_default_credentials_create(); + std::unique_ptr<Credentials> cpp_creds(new Credentials(c_creds)); + return cpp_creds; +} + +// Builds SSL Credentials given SSL specific options +std::unique_ptr<Credentials> CredentialsFactory::SslCredentials( + const SslCredentialsOptions& options) { + grpc_credentials* c_creds = grpc_ssl_credentials_create( + reinterpret_cast<const unsigned char*>(options.pem_root_certs.c_str()), + options.pem_root_certs.size(), + reinterpret_cast<const unsigned char*>(options.pem_private_key.c_str()), + options.pem_private_key.size(), + reinterpret_cast<const unsigned char*>(options.pem_cert_chain.c_str()), + options.pem_cert_chain.size()); + std::unique_ptr<Credentials> cpp_creds(new Credentials(c_creds)); + return cpp_creds; +} + +// Builds credentials for use when running in GCE +std::unique_ptr<Credentials> CredentialsFactory::ComputeEngineCredentials() { + grpc_credentials* c_creds = grpc_compute_engine_credentials_create(); + std::unique_ptr<Credentials> cpp_creds(new Credentials(c_creds)); + return cpp_creds; +} + + +// Combines two credentials objects into a composite credentials. +std::unique_ptr<Credentials> CredentialsFactory::ComposeCredentials( + const std::unique_ptr<Credentials>& creds1, + const std::unique_ptr<Credentials>& creds2) { + // Note that we are not saving unique_ptrs to the two credentials + // passed in here. This is OK because the underlying C objects (i.e., + // creds1 and creds2) into grpc_composite_credentials_create will see their + // refcounts incremented. + grpc_credentials* c_creds = grpc_composite_credentials_create( + creds1->GetRawCreds(), creds2->GetRawCreds()); + std::unique_ptr<Credentials> cpp_creds(new Credentials(c_creds)); + return cpp_creds; +} + +} // namespace grpc diff --git a/src/cpp/client/internal_stub.cc b/src/cpp/client/internal_stub.cc new file mode 100644 index 0000000000..ec88ba5e7e --- /dev/null +++ b/src/cpp/client/internal_stub.cc @@ -0,0 +1,36 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/client/internal_stub.h" + +namespace grpc {} // namespace grpc diff --git a/src/cpp/client/internal_stub.h b/src/cpp/client/internal_stub.h new file mode 100644 index 0000000000..0eaa717d0b --- /dev/null +++ b/src/cpp/client/internal_stub.h @@ -0,0 +1,60 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_CLIENT_INTERNAL_STUB_H__ +#define __GRPCPP_INTERNAL_CLIENT_INTERNAL_STUB_H__ + +#include <memory> + +#include <grpc++/channel_interface.h> + +namespace grpc { + +class InternalStub { + public: + InternalStub() {} + virtual ~InternalStub() {} + + void set_channel(const std::shared_ptr<ChannelInterface>& channel) { + channel_ = channel; + } + + ChannelInterface* channel() { return channel_.get(); } + + private: + std::shared_ptr<ChannelInterface> channel_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_CLIENT_INTERNAL_STUB_H__ diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc new file mode 100644 index 0000000000..255d1461a9 --- /dev/null +++ b/src/cpp/proto/proto_utils.cc @@ -0,0 +1,71 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/proto/proto_utils.h" +#include <grpc++/config.h> + +#include <grpc/grpc.h> +#include <grpc/support/slice.h> +#include <google/protobuf/message.h> + +namespace grpc { + +bool SerializeProto(const google::protobuf::Message& msg, grpc_byte_buffer** bp) { + grpc::string msg_str; + bool success = msg.SerializeToString(&msg_str); + if (success) { + gpr_slice slice = + gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length()); + *bp = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + } + return success; +} + +bool DeserializeProto(grpc_byte_buffer* buffer, google::protobuf::Message* msg) { + grpc::string msg_string; + grpc_byte_buffer_reader* reader = grpc_byte_buffer_reader_create(buffer); + gpr_slice slice; + while (grpc_byte_buffer_reader_next(reader, &slice)) { + const char* data = reinterpret_cast<const char*>( + slice.refcount ? slice.data.refcounted.bytes + : slice.data.inlined.bytes); + msg_string.append(data, slice.refcount ? slice.data.refcounted.length + : slice.data.inlined.length); + gpr_slice_unref(slice); + } + grpc_byte_buffer_reader_destroy(reader); + return msg->ParseFromString(msg_string); +} + +} // namespace grpc diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h new file mode 100644 index 0000000000..11471f1acb --- /dev/null +++ b/src/cpp/proto/proto_utils.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_PROTO_PROTO_UTILS_H__ +#define __GRPCPP_INTERNAL_PROTO_PROTO_UTILS_H__ + +struct grpc_byte_buffer; +namespace google { +namespace protobuf { +class Message; +} +} + +namespace grpc { + +// Serialize the msg into a buffer created inside the function. The caller +// should destroy the returned buffer when done with it. If serialization fails, +// false is returned and buffer is left unchanged. +bool SerializeProto(const google::protobuf::Message& msg, grpc_byte_buffer** buffer); + +// The caller keeps ownership of buffer and msg. +bool DeserializeProto(grpc_byte_buffer* buffer, google::protobuf::Message* msg); + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_PROTO_PROTO_UTILS_H__ diff --git a/src/cpp/rpc_method.cc b/src/cpp/rpc_method.cc new file mode 100644 index 0000000000..8067f42f85 --- /dev/null +++ b/src/cpp/rpc_method.cc @@ -0,0 +1,36 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/rpc_method.h" + +namespace grpc {} // namespace grpc diff --git a/src/cpp/rpc_method.h b/src/cpp/rpc_method.h new file mode 100644 index 0000000000..24a34bed89 --- /dev/null +++ b/src/cpp/rpc_method.h @@ -0,0 +1,69 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_RPC_METHOD_H__ +#define __GRPCPP_INTERNAL_RPC_METHOD_H__ + +namespace google { +namespace protobuf { +class Message; +} +} + +namespace grpc { + +class RpcMethod { + public: + enum RpcType { + NORMAL_RPC = 0, + CLIENT_STREAMING, // request streaming + SERVER_STREAMING, // response streaming + BIDI_STREAMING + }; + + explicit RpcMethod(const char* name) + : name_(name), method_type_(NORMAL_RPC) {} + RpcMethod(const char* name, RpcType type) : name_(name), method_type_(type) {} + + const char *name() const { return name_; } + + RpcType method_type() const { return method_type_; } + + private: + const char *name_; + const RpcType method_type_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_RPC_METHOD_H__ diff --git a/src/cpp/server/async_server.cc b/src/cpp/server/async_server.cc new file mode 100644 index 0000000000..aae2c82050 --- /dev/null +++ b/src/cpp/server/async_server.cc @@ -0,0 +1,89 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/async_server.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc++/completion_queue.h> + +namespace grpc { + +AsyncServer::AsyncServer(CompletionQueue* cc) + : started_(false), shutdown_(false) { + server_ = grpc_server_create(cc->cq(), nullptr); +} + +AsyncServer::~AsyncServer() { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } + grpc_server_destroy(server_); +} + +void AsyncServer::AddPort(const grpc::string& addr) { + GPR_ASSERT(!started_); + int success = grpc_server_add_http2_port(server_, addr.c_str()); + GPR_ASSERT(success); +} + +void AsyncServer::Start() { + GPR_ASSERT(!started_); + started_ = true; + grpc_server_start(server_); +} + +void AsyncServer::RequestOneRpc() { + GPR_ASSERT(started_); + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (shutdown_) { + return; + } + lock.unlock(); + grpc_call_error err = grpc_server_request_call(server_, nullptr); + GPR_ASSERT(err == GRPC_CALL_OK); +} + +void AsyncServer::Shutdown() { + std::unique_lock<std::mutex> lock(shutdown_mu_); + if (started_ && !shutdown_) { + shutdown_ = true; + lock.unlock(); + // TODO(yangg) should we shutdown without start? + grpc_server_shutdown(server_); + } +} + +} // namespace grpc diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc new file mode 100644 index 0000000000..b231f4b0cf --- /dev/null +++ b/src/cpp/server/async_server_context.cc @@ -0,0 +1,101 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/async_server_context.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/cpp/proto/proto_utils.h" +#include <google/protobuf/message.h> +#include <grpc++/status.h> + +namespace grpc { + +AsyncServerContext::AsyncServerContext( + grpc_call* call, const grpc::string& method, const grpc::string& host, + system_clock::time_point absolute_deadline) + : method_(method), + host_(host), + absolute_deadline_(absolute_deadline), + request_(nullptr), + call_(call) { +} + +AsyncServerContext::~AsyncServerContext() { grpc_call_destroy(call_); } + +void AsyncServerContext::Accept(grpc_completion_queue* cq) { + grpc_call_accept(call_, cq, this, 0); +} + +bool AsyncServerContext::StartRead(google::protobuf::Message* request) { + GPR_ASSERT(request); + request_ = request; + grpc_call_error err = grpc_call_start_read(call_, this); + return err == GRPC_CALL_OK; +} + +bool AsyncServerContext::StartWrite(const google::protobuf::Message& response, + int flags) { + grpc_byte_buffer* buffer = nullptr; + if (!SerializeProto(response, &buffer)) { + return false; + } + grpc_call_error err = grpc_call_start_write(call_, buffer, this, flags); + grpc_byte_buffer_destroy(buffer); + return err == GRPC_CALL_OK; +} + +namespace { +grpc_status TranslateStatus(const Status& status) { + grpc_status c_status; + // TODO(yangg) + c_status.code = GRPC_STATUS_OK; + c_status.details = nullptr; + return c_status; +} +} // namespace + +bool AsyncServerContext::StartWriteStatus(const Status& status) { + grpc_status c_status = TranslateStatus(status); + grpc_call_error err = grpc_call_start_write_status(call_, c_status, this); + return err == GRPC_CALL_OK; +} + +bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) { + GPR_ASSERT(request_); + bool success = DeserializeProto(read_buffer, request_); + request_ = nullptr; + return success; +} + +} // namespace grpc diff --git a/src/cpp/server/completion_queue.cc b/src/cpp/server/completion_queue.cc new file mode 100644 index 0000000000..04eb301f7e --- /dev/null +++ b/src/cpp/server/completion_queue.cc @@ -0,0 +1,113 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// TODO(yangg) maybe move to internal/common +#include <grpc++/completion_queue.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include "src/cpp/util/time.h" +#include <grpc++/async_server_context.h> + +namespace grpc { + +CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); } + +CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } + +void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } + +CompletionQueue::CompletionType CompletionQueue::Next(void** tag) { + grpc_event* ev; + CompletionType return_type; + bool success; + + ev = grpc_completion_queue_next(cq_, gpr_inf_future); + if (!ev) { + gpr_log(GPR_ERROR, "no next event in queue"); + abort(); + } + switch (ev->type) { + case GRPC_QUEUE_SHUTDOWN: + return_type = QUEUE_CLOSED; + break; + case GRPC_READ: + *tag = ev->tag; + if (ev->data.read) { + success = + static_cast<AsyncServerContext*>(ev->tag)->ParseRead(ev->data.read); + return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR; + } else { + return_type = SERVER_READ_ERROR; + } + break; + case GRPC_WRITE_ACCEPTED: + *tag = ev->tag; + if (ev->data.write_accepted != GRPC_OP_ERROR) { + return_type = SERVER_WRITE_OK; + } else { + return_type = SERVER_WRITE_ERROR; + } + break; + case GRPC_SERVER_RPC_NEW: + GPR_ASSERT(!ev->tag); + // Finishing the pending new rpcs after the server has been shutdown. + if (!ev->call) { + *tag = nullptr; + } else { + *tag = new AsyncServerContext(ev->call, ev->data.server_rpc_new.method, + ev->data.server_rpc_new.host, + AbsoluteDeadlineTimespec2Timepoint( + ev->data.server_rpc_new.deadline)); + } + return_type = SERVER_RPC_NEW; + break; + case GRPC_FINISHED: + *tag = ev->tag; + return_type = RPC_END; + break; + case GRPC_FINISH_ACCEPTED: + *tag = ev->tag; + return_type = HALFCLOSE_OK; + break; + default: + // We do not handle client side messages now + gpr_log(GPR_ERROR, "client-side messages aren't supported yet"); + abort(); + } + grpc_event_finish(ev); + return return_type; +} + +} // namespace grpc diff --git a/src/cpp/server/rpc_service_method.h b/src/cpp/server/rpc_service_method.h new file mode 100644 index 0000000000..ac2badda71 --- /dev/null +++ b/src/cpp/server/rpc_service_method.h @@ -0,0 +1,131 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ +#define __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ + +#include <functional> +#include <map> +#include <memory> +#include <vector> + +#include "src/cpp/rpc_method.h" +#include <google/protobuf/message.h> +#include <grpc++/status.h> + +namespace grpc { + +// TODO(rocking): we might need to split this file into multiple ones. + +// Base class for running an RPC handler. +class MethodHandler { + public: + virtual ~MethodHandler() {} + struct HandlerParameter { + HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp) + : request(req), response(resp) {} + const google::protobuf::Message* request; + google::protobuf::Message* response; + }; + virtual ::grpc::Status RunHandler(const HandlerParameter& param) = 0; +}; + +// A wrapper class of an application provided rpc method handler. +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler : public MethodHandler { + public: + RpcMethodHandler(std::function<::grpc::Status( + ServiceType*, const RequestType*, ResponseType*)> func, + ServiceType* service) + : func_(func), service_(service) {} + + ::grpc::Status RunHandler(const HandlerParameter& param) final { + // Invoke application function, cast proto messages to their actual types. + return func_(service_, dynamic_cast<const RequestType*>(param.request), + dynamic_cast<ResponseType*>(param.response)); + } + + private: + // Application provided rpc handler function. + std::function<::grpc::Status(ServiceType*, const RequestType*, ResponseType*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + +// Server side rpc method class +class RpcServiceMethod : public RpcMethod { + public: + // Takes ownership of the handler and two prototype objects. + RpcServiceMethod(const char* name, MethodHandler* handler, + google::protobuf::Message* request_prototype, + google::protobuf::Message* response_prototype) + : RpcMethod(name), + handler_(handler), + request_prototype_(request_prototype), + response_prototype_(response_prototype) {} + + MethodHandler* handler() { return handler_.get(); } + + google::protobuf::Message* AllocateRequestProto() { return request_prototype_->New(); } + google::protobuf::Message* AllocateResponseProto() { + return response_prototype_->New(); + } + + private: + std::unique_ptr<MethodHandler> handler_; + std::unique_ptr<google::protobuf::Message> request_prototype_; + std::unique_ptr<google::protobuf::Message> response_prototype_; +}; + +// This class contains all the method information for an rpc service. It is +// used for registering a service on a grpc server. +class RpcService { + public: + // Takes ownership. + void AddMethod(RpcServiceMethod* method) { + methods_.push_back(std::unique_ptr<RpcServiceMethod>(method)); + } + + RpcServiceMethod* GetMethod(int i) { + return methods_[i].get(); + } + int GetMethodCount() const { return methods_.size(); } + + private: + std::vector<std::unique_ptr<RpcServiceMethod>> methods_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_RPC_SERVICE_METHOD_H__ diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc new file mode 100644 index 0000000000..9bf4073238 --- /dev/null +++ b/src/cpp/server/server.cc @@ -0,0 +1,166 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server.h> +#include <utility> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/cpp/server/rpc_service_method.h" +#include "src/cpp/server/server_rpc_handler.h" +#include "src/cpp/server/thread_pool.h" +#include <grpc++/async_server_context.h> +#include <grpc++/completion_queue.h> + +namespace grpc { + +// TODO(rocking): consider a better default value like num of cores. +static const int kNumThreads = 4; + +Server::Server(ThreadPoolInterface* thread_pool) + : started_(false), + shutdown_(false), + num_running_cb_(0), + thread_pool_(thread_pool == nullptr ? new ThreadPool(kNumThreads) + : thread_pool), + thread_pool_owned_(thread_pool == nullptr) { + server_ = grpc_server_create(cq_.cq(), nullptr); +} + +Server::Server() { + // Should not be called. + GPR_ASSERT(false); +} + +Server::~Server() { + std::unique_lock<std::mutex> lock(mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } + grpc_server_destroy(server_); + if (thread_pool_owned_) { + delete thread_pool_; + } +} + +void Server::RegisterService(RpcService* service) { + for (int i = 0; i < service->GetMethodCount(); ++i) { + RpcServiceMethod* method = service->GetMethod(i); + method_map_.insert(std::make_pair(method->name(), method)); + } +} + +void Server::AddPort(const grpc::string& addr) { + GPR_ASSERT(!started_); + int success = grpc_server_add_http2_port(server_, addr.c_str()); + GPR_ASSERT(success); +} + +void Server::Start() { + GPR_ASSERT(!started_); + started_ = true; + grpc_server_start(server_); + + // Start processing rpcs. + ScheduleCallback(); +} + +void Server::AllowOneRpc() { + GPR_ASSERT(started_); + grpc_call_error err = grpc_server_request_call(server_, nullptr); + GPR_ASSERT(err == GRPC_CALL_OK); +} + +void Server::Shutdown() { + { + std::unique_lock<std::mutex> lock(mu_); + if (started_ && !shutdown_) { + shutdown_ = true; + grpc_server_shutdown(server_); + + // Wait for running callbacks to finish. + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } + } + } + + // Shutdown the completion queue. + cq_.Shutdown(); + void* tag = nullptr; + CompletionQueue::CompletionType t = cq_.Next(&tag); + GPR_ASSERT(t == CompletionQueue::QUEUE_CLOSED); +} + +void Server::ScheduleCallback() { + { + std::unique_lock<std::mutex> lock(mu_); + num_running_cb_++; + } + std::function<void()> callback = std::bind(&Server::RunRpc, this); + thread_pool_->ScheduleCallback(callback); +} + +void Server::RunRpc() { + // Wait for one more incoming rpc. + void* tag = nullptr; + AllowOneRpc(); + CompletionQueue::CompletionType t = cq_.Next(&tag); + GPR_ASSERT(t == CompletionQueue::SERVER_RPC_NEW); + + AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag); + // server_context could be nullptr during server shutdown. + if (server_context != nullptr) { + // Schedule a new callback to handle more rpcs. + ScheduleCallback(); + + RpcServiceMethod* method = nullptr; + auto iter = method_map_.find(server_context->method()); + if (iter != method_map_.end()) { + method = iter->second; + } + ServerRpcHandler rpc_handler(server_context, method); + rpc_handler.StartRpc(); + } + + { + std::unique_lock<std::mutex> lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); + } + } +} + +} // namespace grpc diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc new file mode 100644 index 0000000000..d5d0689bc5 --- /dev/null +++ b/src/cpp/server/server_builder.cc @@ -0,0 +1,66 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server_builder.h> + +#include <grpc++/server.h> + +namespace grpc { + +ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} + +void ServerBuilder::RegisterService(RpcService* service) { + services_.push_back(service); +} + +void ServerBuilder::AddPort(const grpc::string& addr) { + ports_.push_back(addr); +} + +void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { + thread_pool_ = thread_pool; +} + +std::unique_ptr<Server> ServerBuilder::BuildAndStart() { + std::unique_ptr<Server> server(new Server(thread_pool_)); + for (auto* service : services_) { + server->RegisterService(service); + } + for (auto& port : ports_) { + server->AddPort(port); + } + server->Start(); + return server; +} + +} // namespace grpc diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc new file mode 100644 index 0000000000..2d5a081deb --- /dev/null +++ b/src/cpp/server/server_rpc_handler.cc @@ -0,0 +1,108 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/server/server_rpc_handler.h" + +#include <grpc/support/log.h> +#include "src/cpp/server/rpc_service_method.h" +#include <grpc++/async_server_context.h> + +namespace grpc { + +ServerRpcHandler::ServerRpcHandler(AsyncServerContext* server_context, + RpcServiceMethod* method) + : server_context_(server_context), + method_(method) { +} + +void ServerRpcHandler::StartRpc() { + // Start the rpc on this dedicated completion queue. + server_context_->Accept(cq_.cq()); + + if (method_ == nullptr) { + // Method not supported, finish the rpc with error. + // TODO(rocking): do we need to call read to consume the request? + FinishRpc(Status(StatusCode::UNIMPLEMENTED, "No such method.")); + return; + } + + // Allocate request and response. + std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto()); + std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto()); + + // Read request + server_context_->StartRead(request.get()); + auto type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK); + + // Run the application's rpc handler + MethodHandler* handler = method_->handler(); + Status status = handler->RunHandler( + MethodHandler::HandlerParameter(request.get(), response.get())); + + if (status.IsOk()) { + // Send the response if we get an ok status. + server_context_->StartWrite(*response, 0); + type = WaitForNextEvent(); + if (type != CompletionQueue::SERVER_WRITE_OK) { + status = Status(StatusCode::INTERNAL, "Error writing response."); + } + } + + FinishRpc(status); +} + +CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() { + void* tag = nullptr; + CompletionQueue::CompletionType type = cq_.Next(&tag); + if (type != CompletionQueue::QUEUE_CLOSED && + type != CompletionQueue::RPC_END) { + GPR_ASSERT(static_cast<AsyncServerContext*>(tag) == server_context_.get()); + } + return type; +} + +void ServerRpcHandler::FinishRpc(const Status& status) { + server_context_->StartWriteStatus(status); + CompletionQueue::CompletionType type = WaitForNextEvent(); + // TODO(rocking): do we care about this return type? + + type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::RPC_END); + + cq_.Shutdown(); + type = WaitForNextEvent(); + GPR_ASSERT(type == CompletionQueue::QUEUE_CLOSED); +} + +} // namespace grpc diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/server/server_rpc_handler.h new file mode 100644 index 0000000000..ca48fd74dd --- /dev/null +++ b/src/cpp/server/server_rpc_handler.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ +#define __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ + +#include <memory> + +#include <grpc++/completion_queue.h> +#include <grpc++/status.h> + +namespace grpc { + +class AsyncServerContext; +class RpcServiceMethod; + +class ServerRpcHandler { + public: + // Takes ownership of server_context. + ServerRpcHandler(AsyncServerContext* server_context, + RpcServiceMethod* method); + + void StartRpc(); + + private: + CompletionQueue::CompletionType WaitForNextEvent(); + void FinishRpc(const Status& status); + + std::unique_ptr<AsyncServerContext> server_context_; + RpcServiceMethod* method_; + CompletionQueue cq_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc new file mode 100644 index 0000000000..ce364c4795 --- /dev/null +++ b/src/cpp/server/thread_pool.cc @@ -0,0 +1,77 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/server/thread_pool.h" + +namespace grpc { + +ThreadPool::ThreadPool(int num_threads) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back(std::thread([=]() { + for (;;) { + std::unique_lock<std::mutex> lock(mu_); + // Wait until work is available or we are shutting down. + cv_.wait(lock, [=]() { return shutdown_ || !callbacks_.empty(); }); + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } + })); + } +} + +ThreadPool::~ThreadPool() { + { + std::lock_guard<std::mutex> lock(mu_); + shutdown_ = true; + cv_.notify_all(); + } + for (auto& t : threads_) { + t.join(); + } +} + +void ThreadPool::ScheduleCallback(const std::function<void()>& callback) { + std::lock_guard<std::mutex> lock(mu_); + callbacks_.push(callback); + cv_.notify_all(); +} + +} // namespace grpc diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h new file mode 100644 index 0000000000..6fc71d6695 --- /dev/null +++ b/src/cpp/server/thread_pool.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ +#define __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ + +#include <grpc++/thread_pool_interface.h> + +#include <condition_variable> +#include <thread> +#include <mutex> +#include <queue> +#include <vector> + +namespace grpc { + +class ThreadPool : public ThreadPoolInterface { + public: + explicit ThreadPool(int num_threads); + ~ThreadPool(); + + void ScheduleCallback(const std::function<void()>& callback) final; + + private: + std::mutex mu_; + std::condition_variable cv_; + bool shutdown_ = false; + std::queue<std::function<void()>> callbacks_; + std::vector<std::thread> threads_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_SERVER_THREAD_POOL_H__ diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc new file mode 100644 index 0000000000..07e771f7e1 --- /dev/null +++ b/src/cpp/stream/stream_context.cc @@ -0,0 +1,276 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/stream/stream_context.h" + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/cpp/rpc_method.h" +#include "src/cpp/proto/proto_utils.h" +#include "src/cpp/util/time.h" +#include <grpc++/client_context.h> +#include <grpc++/config.h> +#include <google/protobuf/message.h> + +namespace grpc { + +// Client only ctor +StreamContext::StreamContext(const RpcMethod& method, ClientContext* context, + const google::protobuf::Message* request, + google::protobuf::Message* result) + : is_client_(true), + method_(&method), + context_(context), + request_(request), + result_(result), + invoke_ev_(nullptr), + read_ev_(nullptr), + write_ev_(nullptr), + reading_(false), + writing_(false), + got_read_(false), + got_write_(false), + peer_halfclosed_(false), + self_halfclosed_(false), + stream_finished_(false), + waiting_(false) { + GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); +} + +StreamContext::~StreamContext() { cq_poller_.join(); } + +void StreamContext::PollingLoop() { + grpc_event* ev = nullptr; + gpr_timespec absolute_deadline; + AbsoluteDeadlineTimepoint2Timespec(context_->absolute_deadline(), + &absolute_deadline); + std::condition_variable* cv_to_notify = nullptr; + std::unique_lock<std::mutex> lock(mu_, std::defer_lock); + while (1) { + cv_to_notify = nullptr; + lock.lock(); + if (stream_finished_ && !reading_ && !writing_) { + return; + } + lock.unlock(); + ev = grpc_completion_queue_next(context_->cq(), absolute_deadline); + lock.lock(); + if (!ev) { + stream_finished_ = true; + final_status_ = Status(StatusCode::DEADLINE_EXCEEDED); + std::condition_variable* cvs[3] = {reading_ ? &read_cv_ : nullptr, + writing_ ? &write_cv_ : nullptr, + waiting_ ? &finish_cv_ : nullptr}; + got_read_ = reading_; + got_write_ = writing_; + read_ev_ = nullptr; + write_ev_ = nullptr; + lock.unlock(); + for (int i = 0; i < 3; i++) { + if (cvs[i]) cvs[i]->notify_one(); + } + break; + } + switch (ev->type) { + case GRPC_READ: + GPR_ASSERT(reading_); + got_read_ = true; + read_ev_ = ev; + cv_to_notify = &read_cv_; + reading_ = false; + break; + case GRPC_FINISH_ACCEPTED: + case GRPC_WRITE_ACCEPTED: + got_write_ = true; + write_ev_ = ev; + cv_to_notify = &write_cv_; + writing_ = false; + break; + case GRPC_FINISHED: { + grpc::string error_details( + ev->data.finished.details ? ev->data.finished.details : ""); + final_status_ = Status(static_cast<StatusCode>(ev->data.finished.code), + error_details); + grpc_event_finish(ev); + stream_finished_ = true; + if (waiting_) { + cv_to_notify = &finish_cv_; + } + break; + } + case GRPC_INVOKE_ACCEPTED: + invoke_ev_ = ev; + cv_to_notify = &invoke_cv_; + break; + case GRPC_CLIENT_METADATA_READ: + grpc_event_finish(ev); + break; + default: + grpc_event_finish(ev); + // not handling other types now + gpr_log(GPR_ERROR, "unknown event type"); + abort(); + } + lock.unlock(); + if (cv_to_notify) { + cv_to_notify->notify_one(); + } + } +} + +void StreamContext::Start(bool buffered) { + // TODO(yangg) handle metadata send path + int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; + grpc_call_error error = grpc_call_start_invoke( + context_->call(), context_->cq(), this, this, this, flag); + GPR_ASSERT(GRPC_CALL_OK == error); + // kicks off the poller thread + cq_poller_ = std::thread(&StreamContext::PollingLoop, this); + std::unique_lock<std::mutex> lock(mu_); + while (!invoke_ev_) { + invoke_cv_.wait(lock); + } + lock.unlock(); + GPR_ASSERT(invoke_ev_->data.invoke_accepted == GRPC_OP_OK); + grpc_event_finish(invoke_ev_); +} + +namespace { +// Wait for got_event with event_cv protected by mu, return event. +grpc_event* WaitForEvent(bool* got_event, std::condition_variable* event_cv, + std::mutex* mu, grpc_event** event) { + std::unique_lock<std::mutex> lock(*mu); + while (*got_event == false) { + event_cv->wait(lock); + } + *got_event = false; + return *event; +} +} // namespace + +bool StreamContext::Read(google::protobuf::Message* msg) { + std::unique_lock<std::mutex> lock(mu_); + if (stream_finished_) { + peer_halfclosed_ = true; + return false; + } + reading_ = true; + lock.unlock(); + + grpc_call_error err = grpc_call_start_read(context_->call(), this); + GPR_ASSERT(err == GRPC_CALL_OK); + + grpc_event* ev = WaitForEvent(&got_read_, &read_cv_, &mu_, &read_ev_); + if (!ev) { + return false; + } + GPR_ASSERT(ev->type == GRPC_READ); + bool ret = true; + if (ev->data.read) { + if (!DeserializeProto(ev->data.read, msg)) { + ret = false; // parse error + // TODO(yangg) cancel the stream. + } + } else { + ret = false; + peer_halfclosed_ = true; + } + grpc_event_finish(ev); + return ret; +} + +bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { + bool ret = true; + grpc_event* ev = nullptr; + + std::unique_lock<std::mutex> lock(mu_); + if (stream_finished_) { + self_halfclosed_ = true; + return false; + } + writing_ = true; + lock.unlock(); + + if (msg) { + grpc_byte_buffer* out_buf = nullptr; + if (!SerializeProto(*msg, &out_buf)) { + FinishStream(Status(StatusCode::INVALID_ARGUMENT, + "Failed to serialize request proto"), + true); + return false; + } + int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; + grpc_call_error err = + grpc_call_start_write(context_->call(), out_buf, this, flag); + grpc_byte_buffer_destroy(out_buf); + GPR_ASSERT(err == GRPC_CALL_OK); + + ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_); + if (!ev) { + return false; + } + GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED); + + ret = ev->data.write_accepted == GRPC_OP_OK; + grpc_event_finish(ev); + } + if (is_last) { + grpc_call_error err = grpc_call_writes_done(context_->call(), this); + GPR_ASSERT(err == GRPC_CALL_OK); + ev = WaitForEvent(&got_write_, &write_cv_, &mu_, &write_ev_); + if (!ev) { + return false; + } + GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED); + grpc_event_finish(ev); + self_halfclosed_ = true; + } + return ret; +} + +const Status& StreamContext::Wait() { + std::unique_lock<std::mutex> lock(mu_); + // TODO(yangg) if not halfclosed cancel the stream + GPR_ASSERT(self_halfclosed_); + GPR_ASSERT(peer_halfclosed_); + GPR_ASSERT(!waiting_); + waiting_ = true; + while (!stream_finished_) { + finish_cv_.wait(lock); + } + return final_status_; +} + +void StreamContext::FinishStream(const Status& status, bool send) { return; } + +} // namespace grpc diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h new file mode 100644 index 0000000000..b7f462f323 --- /dev/null +++ b/src/cpp/stream/stream_context.h @@ -0,0 +1,105 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ +#define __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ + +#include <condition_variable> +#include <mutex> +#include <thread> + +#include <grpc++/status.h> +#include <grpc++/stream_context_interface.h> + +namespace google { +namespace protobuf { +class Message; +} +} + +struct grpc_event; + +namespace grpc { +class ClientContext; +class RpcMethod; + +class StreamContext : public StreamContextInterface { + public: + StreamContext(const RpcMethod& method, ClientContext* context, + const google::protobuf::Message* request, google::protobuf::Message* result); + ~StreamContext(); + // Start the stream, if there is a final write following immediately, set + // buffered so that the messages can be sent in batch. + void Start(bool buffered) override; + bool Read(google::protobuf::Message* msg) override; + bool Write(const google::protobuf::Message* msg, bool is_last) override; + const Status& Wait() override; + void FinishStream(const Status& status, bool send) override; + + const google::protobuf::Message* request() override { return request_; } + google::protobuf::Message* response() override { return result_; } + + private: + void PollingLoop(); + bool BlockingStart(); + bool is_client_; + const RpcMethod* method_; // not owned + ClientContext* context_; // now owned + const google::protobuf::Message* request_; // not owned + google::protobuf::Message* result_; // not owned + + std::thread cq_poller_; + std::mutex mu_; + std::condition_variable invoke_cv_; + std::condition_variable read_cv_; + std::condition_variable write_cv_; + std::condition_variable finish_cv_; + grpc_event* invoke_ev_; + // TODO(yangg) make these two into queues to support concurrent reads and + // writes + grpc_event* read_ev_; + grpc_event* write_ev_; + bool reading_; + bool writing_; + bool got_read_; + bool got_write_; + bool peer_halfclosed_; + bool self_halfclosed_; + bool stream_finished_; + bool waiting_; + Status final_status_; +}; + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc new file mode 100644 index 0000000000..66be26da07 --- /dev/null +++ b/src/cpp/util/status.cc @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +#include <grpc++/status.h> + +namespace grpc { + +const Status& Status::OK = Status(); +const Status& Status::Cancelled = Status(StatusCode::CANCELLED); + +} // namespace grpc diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc new file mode 100644 index 0000000000..207bebf568 --- /dev/null +++ b/src/cpp/util/time.cc @@ -0,0 +1,61 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/cpp/util/time.h" + +#include <grpc/support/time.h> + +using std::chrono::duration_cast; +using std::chrono::nanoseconds; +using std::chrono::seconds; +using std::chrono::system_clock; + +namespace grpc { + +void AbsoluteDeadlineTimepoint2Timespec(const system_clock::time_point& from, + gpr_timespec* to) { + system_clock::duration deadline = from.time_since_epoch(); + seconds secs = duration_cast<seconds>(deadline); + nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs); + to->tv_sec = secs.count(); + to->tv_nsec = nsecs.count(); +} + +system_clock::time_point AbsoluteDeadlineTimespec2Timepoint(gpr_timespec t) { + system_clock::time_point tp; + tp += seconds(t.tv_sec); + tp += nanoseconds(t.tv_nsec); + return tp; +} + +} // namespace grpc diff --git a/src/cpp/util/time.h b/src/cpp/util/time.h new file mode 100644 index 0000000000..c21fba7ec3 --- /dev/null +++ b/src/cpp/util/time.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_INTERNAL_UTIL_TIME_H__ +#define __GRPCPP_INTERNAL_UTIL_TIME_H__ + +#include <chrono> + +#include <grpc/support/time.h> + +namespace grpc { + +// from and to should be absolute time. +void AbsoluteDeadlineTimepoint2Timespec( + const std::chrono::system_clock::time_point& from, gpr_timespec* to); + +std::chrono::system_clock::time_point AbsoluteDeadlineTimespec2Timepoint( + gpr_timespec t); + +} // namespace grpc + +#endif // __GRPCPP_INTERNAL_UTIL_TIME_H__ |