diff options
Diffstat (limited to 'include/grpc++/impl/codegen')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 27 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 11 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 12 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_context.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/config_protobuf.h | 18 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 9 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 5 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/impl/async_stream.h | 486 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/impl/status_code_enum.h | 152 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 15 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/proto_utils.h | 17 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_context.h | 9 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 15 |
13 files changed, 126 insertions, 652 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e96d224ddb..70533aa4d9 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -330,6 +330,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> { meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -345,6 +348,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. @@ -363,6 +369,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -400,6 +409,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> { meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -409,6 +421,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> { if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -421,6 +436,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -459,6 +477,9 @@ class ServerAsyncReaderWriter GRPC_FINAL meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -474,6 +495,9 @@ class ServerAsyncReaderWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -486,6 +510,9 @@ class ServerAsyncReaderWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 47ac5bee92..5ceab73cea 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -65,7 +65,7 @@ class ClientAsyncResponseReader GRPC_FINAL const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { + collection_(std::make_shared<CallOpSetCollection>()) { collection_->init_buf_.SetCollection(collection_); collection_->init_buf_.SendInitialMetadata( context->send_initial_metadata_, context->initial_metadata_flags()); @@ -126,6 +126,9 @@ class ServerAsyncResponseWriter GRPC_FINAL meta_buf_.set_output_tag(tag); meta_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } @@ -135,6 +138,9 @@ class ServerAsyncResponseWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. @@ -153,6 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index fab85d1517..dfac177970 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -180,17 +180,23 @@ class CallNoOp { class CallOpSendInitialMetadata { public: - CallOpSendInitialMetadata() : send_(false) {} + CallOpSendInitialMetadata() : send_(false) { + maybe_compression_level_.is_set = false; + } void SendInitialMetadata( const std::multimap<grpc::string, grpc::string>& metadata, uint32_t flags) { + maybe_compression_level_.is_set = false; send_ = true; flags_ = flags; initial_metadata_count_ = metadata.size(); initial_metadata_ = FillMetadataArray(metadata); - // TODO(dgq): expose compression level in API so it can be properly set. - maybe_compression_level_.is_set = false; + } + + void set_compression_level(grpc_compression_level level) { + maybe_compression_level_.is_set = true; + maybe_compression_level_.level = level; } protected: diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index a132c9a57a..012bcc2bbe 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -41,7 +41,7 @@ /// /// Context settings are only relevant to the call they are invoked with, that /// is to say, they aren't sticky. Some of these settings, such as the -/// compression options, can be made persistant at channel construction time +/// compression options, can be made persistent at channel construction time /// (see \a grpc::CreateCustomChannel). /// /// \warning ClientContext instances should \em not be reused across rpcs. diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h index 4bee1bc422..8620d4b218 100644 --- a/include/grpc++/impl/codegen/config_protobuf.h +++ b/include/grpc++/impl/codegen/config_protobuf.h @@ -40,16 +40,21 @@ #endif #ifndef GRPC_CUSTOM_MESSAGE +#ifdef GRPC_USE_PROTO_LITE +#include <google/protobuf/message_lite.h> +#define GRPC_CUSTOM_MESSAGE ::google::protobuf::MessageLite +#else #include <google/protobuf/message.h> #define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message #endif +#endif #ifndef GRPC_CUSTOM_DESCRIPTOR #include <google/protobuf/descriptor.h> #include <google/protobuf/descriptor.pb.h> #define GRPC_CUSTOM_DESCRIPTOR ::google::protobuf::Descriptor #define GRPC_CUSTOM_DESCRIPTORPOOL ::google::protobuf::DescriptorPool -#define GPRC_CUSTOM_FIELDDESCRIPTOR ::google::protobuf::FieldDescriptor +#define GRPC_CUSTOM_FIELDDESCRIPTOR ::google::protobuf::FieldDescriptor #define GRPC_CUSTOM_FILEDESCRIPTOR ::google::protobuf::FileDescriptor #define GRPC_CUSTOM_FILEDESCRIPTORPROTO ::google::protobuf::FileDescriptorProto #define GRPC_CUSTOM_METHODDESCRIPTOR ::google::protobuf::MethodDescriptor @@ -57,6 +62,13 @@ #define GRPC_CUSTOM_SOURCELOCATION ::google::protobuf::SourceLocation #endif +#ifndef GRPC_CUSTOM_DESCRIPTORDATABASE +#include <google/protobuf/descriptor_database.h> +#define GRPC_CUSTOM_DESCRIPTORDATABASE ::google::protobuf::DescriptorDatabase +#define GRPC_CUSTOM_SIMPLEDESCRIPTORDATABASE \ + ::google::protobuf::SimpleDescriptorDatabase +#endif + #ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream.h> @@ -75,11 +87,13 @@ typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; typedef GRPC_CUSTOM_DESCRIPTOR Descriptor; typedef GRPC_CUSTOM_DESCRIPTORPOOL DescriptorPool; -typedef GPRC_CUSTOM_FIELDDESCRIPTOR FieldDescriptor; +typedef GRPC_CUSTOM_DESCRIPTORDATABASE DescriptorDatabase; +typedef GRPC_CUSTOM_FIELDDESCRIPTOR FieldDescriptor; typedef GRPC_CUSTOM_FILEDESCRIPTOR FileDescriptor; typedef GRPC_CUSTOM_FILEDESCRIPTORPROTO FileDescriptorProto; typedef GRPC_CUSTOM_METHODDESCRIPTOR MethodDescriptor; typedef GRPC_CUSTOM_SERVICEDESCRIPTOR ServiceDescriptor; +typedef GRPC_CUSTOM_SIMPLEDESCRIPTORDATABASE SimpleDescriptorDatabase; typedef GRPC_CUSTOM_SOURCELOCATION SourceLocation; namespace io { diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index b0c4c57e66..2586b94504 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -31,6 +31,9 @@ * */ +#ifndef GRPCXX_IMPL_CODEGEN_CORE_CODEGEN_H +#define GRPCXX_IMPL_CODEGEN_CORE_CODEGEN_H + // This file should be compiled as part of grpc++. #include <grpc++/impl/codegen/core_codegen_interface.h> @@ -54,8 +57,8 @@ class CoreCodegen : public CoreCodegenInterface { void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) GRPC_OVERRIDE; - void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, - grpc_byte_buffer* buffer) GRPC_OVERRIDE; + int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) GRPC_OVERRIDE; void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) GRPC_OVERRIDE; int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, @@ -83,3 +86,5 @@ class CoreCodegen : public CoreCodegenInterface { }; } // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_CORE_CODEGEN_H diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 64d882ed5d..f9a8f9b980 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -65,8 +65,9 @@ class CoreCodegenInterface { virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; - virtual void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, - grpc_byte_buffer* buffer) = 0; + virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) + GRPC_MUST_USE_RESULT = 0; virtual void grpc_byte_buffer_reader_destroy( grpc_byte_buffer_reader* reader) = 0; virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/impl/async_stream.h b/include/grpc++/impl/codegen/impl/async_stream.h deleted file mode 100644 index 7d7a956807..0000000000 --- a/include/grpc++/impl/codegen/impl/async_stream.h +++ /dev/null @@ -1,486 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H -#define GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H - -#include <grpc++/impl/codegen/call.h> -#include <grpc++/impl/codegen/channel_interface.h> -#include <grpc++/impl/codegen/core_codegen_interface.h> -#include <grpc++/impl/codegen/server_context.h> -#include <grpc++/impl/codegen/service_type.h> -#include <grpc++/impl/codegen/status.h> - -namespace grpc { - -class CompletionQueue; - -/// Common interface for all client side asynchronous streaming. -class ClientAsyncStreamingInterface { - public: - virtual ~ClientAsyncStreamingInterface() {} - - /// Request notification of the reading of the initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Request notification completion. - /// - /// \param[out] status To be updated with the operation status. - /// \param[in] tag Tag identifying this request. - virtual void Finish(Status* status, void* tag) = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template <class R> -class AsyncReaderInterface { - public: - virtual ~AsyncReaderInterface() {} - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - virtual void Read(R* msg, void* tag) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template <class W> -class AsyncWriterInterface { - public: - virtual ~AsyncWriterInterface() {} - - /// Request the writing of \a msg with identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// - /// \param[in] msg The message to be written. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, void* tag) = 0; -}; - -template <class R> -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface<R> {}; - -template <class R> -class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { - public: - /// Create a stream and write the first request out. - template <class W> - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> - init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template <class W> -class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { - public: - template <class R> - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> - finish_ops_; -}; - -/// Client-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template <class W, class R> -class ClientAsyncReaderWriter GRPC_FINAL - : public ClientAsyncReaderWriterInterface<W, R> { - public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -template <class W, class R> -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface<R> { - public: - virtual void Finish(const W& msg, const Status& status, void* tag) = 0; - - virtual void FinishWithError(const Status& status, void* tag) = 0; -}; - -template <class W, class R> -class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface<W, R> { - public: - explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); - } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_ops_); - } - - void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> - finish_ops_; -}; - -template <class W> -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - virtual void Finish(const Status& status, void* tag) = 0; -}; - -template <class W> -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface<W> { - public: - explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - virtual void Finish(const Status& status, void* tag) = 0; -}; - -template <class W, class R> -class ServerAsyncReaderWriter GRPC_FINAL - : public ServerAsyncReaderWriterInterface<W, R> { - public: - explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class ::grpc::Server; - - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -} // namespace grpc - -#endif // GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H diff --git a/include/grpc++/impl/codegen/impl/status_code_enum.h b/include/grpc++/impl/codegen/impl/status_code_enum.h deleted file mode 100644 index f8caec0c11..0000000000 --- a/include/grpc++/impl/codegen/impl/status_code_enum.h +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_CODEGEN_IMPL_STATUS_CODE_ENUM_H -#define GRPCXX_IMPL_CODEGEN_IMPL_STATUS_CODE_ENUM_H - -namespace grpc { - -enum StatusCode { - /// Not an error; returned on success. - OK = 0, - - /// The operation was cancelled (typically by the caller). - CANCELLED = 1, - - /// Unknown error. An example of where this error may be returned is if a - /// Status value received from another address space belongs to an error-space - /// that is not known in this address space. Also errors raised by APIs that - /// do not return enough error information may be converted to this error. - UNKNOWN = 2, - - /// Client specified an invalid argument. Note that this differs from - /// FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments that are - /// problematic regardless of the state of the system (e.g., a malformed file - /// name). - INVALID_ARGUMENT = 3, - - /// Deadline expired before operation could complete. For operations that - /// change the state of the system, this error may be returned even if the - /// operation has completed successfully. For example, a successful response - /// from a server could have been delayed long enough for the deadline to - /// expire. - DEADLINE_EXCEEDED = 4, - - /// Some requested entity (e.g., file or directory) was not found. - NOT_FOUND = 5, - - /// Some entity that we attempted to create (e.g., file or directory) already - /// exists. - ALREADY_EXISTS = 6, - - /// The caller does not have permission to execute the specified operation. - /// PERMISSION_DENIED must not be used for rejections caused by exhausting - /// some resource (use RESOURCE_EXHAUSTED instead for those errors). - /// PERMISSION_DENIED must not be used if the caller can not be identified - /// (use UNAUTHENTICATED instead for those errors). - PERMISSION_DENIED = 7, - - /// The request does not have valid authentication credentials for the - /// operation. - UNAUTHENTICATED = 16, - - /// Some resource has been exhausted, perhaps a per-user quota, or perhaps the - /// entire file system is out of space. - RESOURCE_EXHAUSTED = 8, - - /// Operation was rejected because the system is not in a state required for - /// the operation's execution. For example, directory to be deleted may be - /// non-empty, an rmdir operation is applied to a non-directory, etc. - /// - /// A litmus test that may help a service implementor in deciding - /// between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: - /// (a) Use UNAVAILABLE if the client can retry just the failing call. - /// (b) Use ABORTED if the client should retry at a higher-level - /// (e.g., restarting a read-modify-write sequence). - /// (c) Use FAILED_PRECONDITION if the client should not retry until - /// the system state has been explicitly fixed. E.g., if an "rmdir" - /// fails because the directory is non-empty, FAILED_PRECONDITION - /// should be returned since the client should not retry unless - /// they have first fixed up the directory by deleting files from it. - /// (d) Use FAILED_PRECONDITION if the client performs conditional - /// REST Get/Update/Delete on a resource and the resource on the - /// server does not match the condition. E.g., conflicting - /// read-modify-write on the same resource. - FAILED_PRECONDITION = 9, - - /// The operation was aborted, typically due to a concurrency issue like - /// sequencer check failures, transaction aborts, etc. - /// - /// See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, - /// and UNAVAILABLE. - ABORTED = 10, - - /// Operation was attempted past the valid range. E.g., seeking or reading - /// past end of file. - /// - /// Unlike INVALID_ARGUMENT, this error indicates a problem that may be fixed - /// if the system state changes. For example, a 32-bit file system will - /// generate INVALID_ARGUMENT if asked to read at an offset that is not in the - /// range [0,2^32-1], but it will generate OUT_OF_RANGE if asked to read from - /// an offset past the current file size. - /// - /// There is a fair bit of overlap between FAILED_PRECONDITION and - /// OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific error) - /// when it applies so that callers who are iterating through a space can - /// easily look for an OUT_OF_RANGE error to detect when they are done. - OUT_OF_RANGE = 11, - - /// Operation is not implemented or not supported/enabled in this service. - UNIMPLEMENTED = 12, - - /// Internal errors. Means some invariants expected by underlying System has - /// been broken. If you see one of these errors, Something is very broken. - INTERNAL = 13, - - /// The service is currently unavailable. This is a most likely a transient - /// condition and may be corrected by retrying with a backoff. - /// - /// See litmus test above for deciding between FAILED_PRECONDITION, ABORTED, - /// and UNAVAILABLE. - UNAVAILABLE = 14, - - /// Unrecoverable data loss or corruption. - DATA_LOSS = 15, - - /// Force users to include a default branch: - DO_NOT_USE = -1 -}; - -} // namespace grpc - -#endif // GRPCXX_IMPL_CODEGEN_IMPL_STATUS_CODE_ENUM_H diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 21ac6c4fb5..2f4be644ba 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -65,6 +65,9 @@ class RpcMethodHandler : public MethodHandler { ops; ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } if (status.ok()) { status = ops.SendMessage(rsp); } @@ -104,6 +107,9 @@ class ClientStreamingHandler : public MethodHandler { ops; ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } if (status.ok()) { status = ops.SendMessage(rsp); } @@ -144,6 +150,9 @@ class ServerStreamingHandler : public MethodHandler { if (!param.server_context->sent_initial_metadata_) { ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -177,6 +186,9 @@ class BidiStreamingHandler : public MethodHandler { if (!param.server_context->sent_initial_metadata_) { ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -199,6 +211,9 @@ class UnknownMethodHandler : public MethodHandler { if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); + if (context->compression_level_set()) { + ops->set_compression_level(context->compression_level()); + } context->sent_initial_metadata_ = true; } ops->ServerSendStatus(context->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 3bad468a74..d4599c5fff 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -111,14 +111,21 @@ class GrpcBufferReader GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyInputStream { public: explicit GrpcBufferReader(grpc_byte_buffer* buffer) - : byte_count_(0), backup_count_(0) { - g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, buffer); + : byte_count_(0), backup_count_(0), status_() { + if (!g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, + buffer)) { + status_ = Status(StatusCode::INTERNAL, + "Couldn't initialize byte buffer reader"); + } } ~GrpcBufferReader() GRPC_OVERRIDE { g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_); } bool Next(const void** data, int* size) GRPC_OVERRIDE { + if (!status_.ok()) { + return false; + } if (backup_count_ > 0) { *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - backup_count_; @@ -139,6 +146,8 @@ class GrpcBufferReader GRPC_FINAL return true; } + Status status() const { return status_; } + void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } bool Skip(int count) GRPC_OVERRIDE { @@ -165,6 +174,7 @@ class GrpcBufferReader GRPC_FINAL int64_t backup_count_; grpc_byte_buffer_reader reader_; gpr_slice slice_; + Status status_; }; } // namespace internal @@ -202,6 +212,9 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of< Status result = g_core_codegen_interface->ok(); { internal::GrpcBufferReader reader(buffer); + if (!reader.status().ok()) { + return reader.status(); + } ::grpc::protobuf::io::CodedInputStream decoder(&reader); if (max_message_size > 0) { decoder.SetTotalBytesLimit(max_message_size, max_message_size); diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index cea13a513f..08212af861 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -130,7 +130,13 @@ class ServerContext { grpc_compression_level compression_level() const { return compression_level_; } - void set_compression_level(grpc_compression_level level); + + void set_compression_level(grpc_compression_level level) { + compression_level_set_ = true; + compression_level_ = level; + } + + bool compression_level_set() const { return compression_level_set_; } grpc_compression_algorithm compression_algorithm() const { return compression_algorithm_; @@ -217,6 +223,7 @@ class ServerContext { std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; + bool compression_level_set_; grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; }; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index cbfa410699..b2b972760d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -347,6 +347,9 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -375,6 +378,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -389,6 +395,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); @@ -413,6 +422,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -434,6 +446,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); |