diff options
author | 2018-01-26 15:39:32 -0800 | |
---|---|---|
committer | 2018-02-15 14:49:34 -0800 | |
commit | 0e00c430827e81d61e1e7164ef04ca21ccbfaa77 (patch) | |
tree | 9246301d4f3583a4befd834f7ea426663b8eb622 /include/grpcpp/impl/codegen/sync_stream.h | |
parent | 8224c45866c6a2cfa29ede0c91a6ae9f40572658 (diff) |
Move headers from grpc++ to grpcpp
Diffstat (limited to 'include/grpcpp/impl/codegen/sync_stream.h')
-rw-r--r-- | include/grpcpp/impl/codegen/sync_stream.h | 934 |
1 files changed, 934 insertions, 0 deletions
diff --git a/include/grpcpp/impl/codegen/sync_stream.h b/include/grpcpp/impl/codegen/sync_stream.h new file mode 100644 index 0000000000..7152eaf41f --- /dev/null +++ b/include/grpcpp/impl/codegen/sync_stream.h @@ -0,0 +1,934 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H +#define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/channel_interface.h> +#include <grpcpp/impl/codegen/client_context.h> +#include <grpcpp/impl/codegen/completion_queue.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/service_type.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +namespace internal { +/// Common interface for all synchronous client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + /// Block waiting until the stream finishes and a final status of the call is + /// available. + /// + /// It is appropriate to call this method when both: + /// * the calling code (client-side) has no more message to send + /// (this can be declared implicitly by calling this method, or + /// explicitly through an earlier call to <i>WritesDone</i> method of the + /// class in use, e.g. \a ClientWriterInterface::WritesDone or + /// \a ClientReaderWriterInterface::WritesDone). + /// * there are no more messages to be received from the server (which can + /// be known implicitly, or explicitly from an earlier call to \a + /// ReaderInterface::Read that returned "false"). + /// + /// This function will return either: + /// - when all incoming messages have been read and the server has + /// returned status. + /// - when the server has returned a non-OK status. + /// - OR when the call failed for some reason and the library generated a + /// status. + /// + /// Return values: + /// - \a Status contains the status code, message and details for the call + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + virtual Status Finish() = 0; +}; + +/// Common interface for all synchronous server side streaming. +class ServerStreamingInterface { + public: + virtual ~ServerStreamingInterface() {} + + /// Block to send initial metadata to client. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// The initial metadata that will be sent to the client will be + /// taken from the \a ServerContext associated with the call. + virtual void SendInitialMetadata() = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. +template <class R> +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + /// Get an upper bound on the next message size available for reading on this + /// stream. + virtual bool NextMessageSize(uint32_t* sz) = 0; + + /// Block to read a message and parse to \a msg. Returns \a true on success. + /// This is thread-safe with respect to \a Write or \WritesDone methods on + /// the same stream. It should not be called concurrently with another \a + /// Read on the same stream as the order of delivery will not be defined. + /// + /// \param[out] msg The read message. + /// + /// \return \a false when there will be no more incoming messages, either + /// because the other side has called \a WritesDone() or the stream has failed + /// (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. +template <class W> +class WriterInterface { + public: + virtual ~WriterInterface() {} + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + virtual bool Write(const W& msg, WriteOptions options) = 0; + + /// Block to write \a msg to the stream with default write options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// + /// \return \a true on success, \a false when the stream has been closed. + inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. For server, WriteLast buffers the \a msg. + /// The writing of \a msg is held until the service handler returns, + /// where \a msg and trailing metadata are coalesced and sent on wire. + /// Note that WriteLast can only buffer \a msg up to the flow control window + /// size. If \a msg size is larger than the window size, it will be sent on + /// wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, WriteOptions options) { + Write(msg, options.set_last_message()); + } +}; + +} // namespace internal + +/// Client-side interface for streaming reads of message of type \a R. +template <class R> +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; +}; + +namespace internal { +template <class R> +class ClientReaderFactory { + public: + template <class W> + static ClientReader<R>* Create(ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader<R>(channel, method, context, request); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing server-streaming RPCs, +/// where the stream of messages coming from the server has messages +/// of type \a R. +template <class R> +class ClientReader final : public ClientReaderInterface<R> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from + /// the server will be accessable through the \a ClientContext used to + /// construct this object. + void WaitForInitialMetadata() override { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); /// status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + *sz = call_.max_receive_message_size(); + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// This also receives initial metadata from the server, if not + /// already received (if initial metadata is received, it can be then + /// accessed through the \a ClientContext associated with this call). + bool Read(R* msg) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// The \a ClientContext associated with this call is updated with + /// possible metadata received from the server. + Status Finish() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderFactory<R>; + ClientContext* context_; + CompletionQueue cq_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template <class W> + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } +}; + +/// Client-side interface for streaming writes of message type \a W. +template <class W> +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W> { + public: + /// Half close writing from the client. (signal that the stream of messages + /// coming from the client is complete). + /// Blocks until currently-pending writes are completed. + /// Thread safe with respect to \a ReaderInterface::Read operations only + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { +template <class W> +class ClientWriterFactory { + public: + template <class R> + static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter<W>(channel, method, context, response); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for doing client-streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W. +template <class W> +class ClientWriter : public ClientWriterInterface<W> { + public: + /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for + /// semantics. + /// + // Side effect: + /// Once complete, the initial metadata read from the server will be + /// accessable through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + /// See the WriterInterface.Write(const W& msg, WriteOptions options) method + /// for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call). + using ::grpc::internal::WriterInterface<W>::Write; + bool Write(const W& msg, WriteOptions options) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// Side effects: + /// - Also receives initial metadata if not already received. + /// - Attempts to fill in the \a response parameter passed + /// to the constructor of this instance with the response + /// message from the server. + Status Finish() override { + Status status; + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + friend class internal::ClientWriterFactory<W>; + + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + template <class R> + ClientWriter(ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } + + ClientContext* context_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; + CompletionQueue cq_; + ::grpc::internal::Call call_; +}; + +/// Client-side interface for bi-directional streaming with +/// client-to-server stream messages of type \a W and +/// server-to-client stream messages of type \a R. +template <class W, class R> +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> { + public: + /// Block to wait for initial metadata from server. The received metadata + /// can only be accessed after this call returns. Should only be called before + /// the first read. Calling this method is optional, and if it is not called + /// the metadata will be available in ClientContext after the first read. + virtual void WaitForInitialMetadata() = 0; + + /// Half close writing from the client. (signal that the stream of messages + /// coming from the clinet is complete). + /// Blocks until currently-pending writes are completed. + /// Thread-safe with respect to \a ReaderInterface::Read + /// + /// \return Whether the writes were successful. + virtual bool WritesDone() = 0; +}; + +namespace internal { +template <class W, class R> +class ClientReaderWriterFactory { + public: + static ClientReaderWriter<W, R>* Create( + ::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, ClientContext* context) { + return new ClientReaderWriter<W, R>(channel, method, context); + } +}; +} // namespace internal + +/// Synchronous (blocking) client-side API for bi-directional streaming RPCs, +/// where the outgoing message stream coming from the client has messages of +/// type \a W, and the incoming messages stream coming from the server has +/// messages of type \a R. +template <class W, class R> +class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { + public: + /// Block waiting to read initial metadata from the server. + /// This call is optional, but if it is used, it cannot be used concurrently + /// with or after the \a Finish method. + /// + /// Once complete, the initial metadata read from the server will be + /// accessable through the \a ClientContext used to construct this object. + void WaitForInitialMetadata() override { + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); + + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool NextMessageSize(uint32_t* sz) override { + *sz = call_.max_receive_message_size(); + return true; + } + + /// See the \a ReaderInterface.Read method for semantics. + /// Side effect: + /// Also receives initial metadata if not already received (updates the \a + /// ClientContext associated with this call in that case). + bool Read(R* msg) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage<R>> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using ::grpc::internal::WriterInterface<W>::Write; + bool Write(const W& msg, WriteOptions options) override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + /// See the ClientStreamingInterface.Finish method for semantics. + /// + /// Side effect: + /// - the \a ClientContext associated with this call is updated with + /// possible trailing metadata sent from the server. + Status Finish() override { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + friend class internal::ClientReaderWriterFactory<W, R>; + + ClientContext* context_; + CompletionQueue cq_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } +}; + +/// Server-side interface for streaming reads of message of type \a R. +template <class R> +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface<R> {}; + +/// Synchronous (blocking) server-side API for doing client-streaming RPCs, +/// where the incoming message stream coming from the client has messages of +/// type \a R. +template <class R> +class ServerReader final : public ServerReaderInterface<R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) override { + *sz = call_->max_receive_message_size(); + return true; + } + + bool Read(R* msg) override { + internal::CallOpSet<internal::CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + private: + internal::Call* const call_; + ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ClientStreamingHandler; + + ServerReader(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for streaming writes of message of type \a W. +template <class W> +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W> {}; + +/// Synchronous (blocking) server-side API for doing for doing a +/// server-streaming RPCs, where the outgoing message stream coming from the +/// server has messages of type \a W. +template <class W> +class ServerWriter final : public ServerWriterInterface<W> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. + /// Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + /// See the \a WriterInterface.Write method for semantics. + /// + /// Side effect: + /// Also sends initial metadata if not already sent (using the + /// \a ClientContext associated with this call to fill in values). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + internal::Call* const call_; + ServerContext* const ctx_; + + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::ServerStreamingHandler; + + ServerWriter(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} +}; + +/// Server-side interface for bi-directional streaming. +template <class W, class R> +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface<W>, + public internal::ReaderInterface<R> {}; + +/// Actual implementation of bi-directional streaming +namespace internal { +template <class W, class R> +class ServerReaderWriterBody final { + public: + ServerReaderWriterBody(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool NextMessageSize(uint32_t* sz) { + *sz = call_->max_receive_message_size(); + return true; + } + + bool Read(R* msg) { + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + bool Write(const W& msg, WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +} // namespace internal + +/// Synchronous (blocking) server-side API for a bidirectional +/// streaming call, where the incoming message stream coming from the client has +/// messages of type \a R, and the outgoing message streaming coming from +/// the server has messages of type \a W. +template <class W, class R> +class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { + public: + /// See the \a ServerStreamingInterface.SendInitialMetadata method + /// for semantics. Note that initial metadata will be affected by the + /// \a ServerContext associated with this call. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + bool Read(R* msg) override { return body_.Read(msg); } + + /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) + /// method for semantics. + /// Side effect: + /// Also sends initial metadata if not already sent (using the \a + /// ServerContext associated with this call). + using internal::WriterInterface<W>::Write; + bool Write(const W& msg, WriteOptions options) override { + return body_.Write(msg, options); + } + + private: + internal::ServerReaderWriterBody<W, R> body_; + + friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, + false>; + ServerReaderWriter(internal::Call* call, ServerContext* ctx) + : body_(call, ctx) {} +}; + +/// A class to represent a flow-controlled unary call. This is something +/// of a hybrid between conventional unary and streaming. This is invoked +/// through a unary call on the client side, but the server responds to it +/// as though it were a single-ping-pong streaming call. The server can use +/// the \a NextMessageSize method to determine an upper-bound on the size of +/// the message. A key difference relative to streaming: ServerUnaryStreamer +/// must have exactly 1 Read and exactly 1 Write, in that order, to function +/// correctly. Otherwise, the RPC is in error. +template <class RequestType, class ResponseType> +class ServerUnaryStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, WriteOptions options) override { + if (write_done_ || !read_done_) { + return false; + } + write_done_ = true; + return body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer<RequestType, ResponseType>, true>; + ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} +}; + +/// A class to represent a flow-controlled server-side streaming call. +/// This is something of a hybrid between server-side and bidi streaming. +/// This is invoked through a server-side streaming call on the client side, +/// but the server responds to it as though it were a bidi streaming call that +/// must first have exactly 1 Read and then any number of Writes. +template <class RequestType, class ResponseType> +class ServerSplitStreamer final + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + /// Block to send initial metadata to client. + /// Implicit input parameter: + /// - the \a ServerContext associated with this call will be used for + /// sending initial metadata. + void SendInitialMetadata() override { body_.SendInitialMetadata(); } + + /// Get an upper bound on the request message size from the client. + bool NextMessageSize(uint32_t* sz) override { + return body_.NextMessageSize(sz); + } + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// This is thread-safe with respect to \a Write or \a WritesDone methods. It + /// should not be called concurrently with other streaming APIs + /// on the same stream. It is not meaningful to call it concurrently + /// with another \a ReaderInterface::Read on the same stream since reads on + /// the same stream are delivered in order. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + bool Read(RequestType* request) override { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + /// Block to write \a msg to the stream with WriteOptions \a options. + /// This is thread-safe with respect to \a ReaderInterface::Read + /// + /// \param msg The message to be written to the stream. + /// \param options The WriteOptions affecting the write operation. + /// + /// \return \a true on success, \a false when the stream has been closed. + using internal::WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, WriteOptions options) override { + return read_done_ && body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>; + ServerSplitStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} +}; + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H |