From 440558a7ec9c78a3aad3203663c2d64d026deb68 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 21 Jul 2016 19:31:38 -0700 Subject: C++ Compression Level set properly --- include/grpc++/impl/codegen/async_stream.h | 27 ++ include/grpc++/impl/codegen/async_unary_call.h | 9 + include/grpc++/impl/codegen/call.h | 12 +- include/grpc++/impl/codegen/impl/async_stream.h | 486 ---------------------- include/grpc++/impl/codegen/method_handler_impl.h | 15 + include/grpc++/impl/codegen/server_context.h | 9 +- include/grpc++/impl/codegen/sync_stream.h | 15 + 7 files changed, 83 insertions(+), 490 deletions(-) delete mode 100644 include/grpc++/impl/codegen/impl/async_stream.h (limited to 'include') diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e96d224ddb..70533aa4d9 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -330,6 +330,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -345,6 +348,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. @@ -363,6 +369,9 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -400,6 +409,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -409,6 +421,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -421,6 +436,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); @@ -459,6 +477,9 @@ class ServerAsyncReaderWriter GRPC_FINAL meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_ops_); } @@ -474,6 +495,9 @@ class ServerAsyncReaderWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { write_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + write_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert @@ -486,6 +510,9 @@ class ServerAsyncReaderWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_ops_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 05681e2242..5ceab73cea 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -126,6 +126,9 @@ class ServerAsyncResponseWriter GRPC_FINAL meta_buf_.set_output_tag(tag); meta_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } @@ -135,6 +138,9 @@ class ServerAsyncResponseWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. @@ -153,6 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index fab85d1517..dfac177970 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -180,17 +180,23 @@ class CallNoOp { class CallOpSendInitialMetadata { public: - CallOpSendInitialMetadata() : send_(false) {} + CallOpSendInitialMetadata() : send_(false) { + maybe_compression_level_.is_set = false; + } void SendInitialMetadata( const std::multimap& metadata, uint32_t flags) { + maybe_compression_level_.is_set = false; send_ = true; flags_ = flags; initial_metadata_count_ = metadata.size(); initial_metadata_ = FillMetadataArray(metadata); - // TODO(dgq): expose compression level in API so it can be properly set. - maybe_compression_level_.is_set = false; + } + + void set_compression_level(grpc_compression_level level) { + maybe_compression_level_.is_set = true; + maybe_compression_level_.level = level; } protected: diff --git a/include/grpc++/impl/codegen/impl/async_stream.h b/include/grpc++/impl/codegen/impl/async_stream.h deleted file mode 100644 index 7d7a956807..0000000000 --- a/include/grpc++/impl/codegen/impl/async_stream.h +++ /dev/null @@ -1,486 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H -#define GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H - -#include -#include -#include -#include -#include -#include - -namespace grpc { - -class CompletionQueue; - -/// Common interface for all client side asynchronous streaming. -class ClientAsyncStreamingInterface { - public: - virtual ~ClientAsyncStreamingInterface() {} - - /// Request notification of the reading of the initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Request notification completion. - /// - /// \param[out] status To be updated with the operation status. - /// \param[in] tag Tag identifying this request. - virtual void Finish(Status* status, void* tag) = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template -class AsyncReaderInterface { - public: - virtual ~AsyncReaderInterface() {} - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - virtual void Read(R* msg, void* tag) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template -class AsyncWriterInterface { - public: - virtual ~AsyncWriterInterface() {} - - /// Request the writing of \a msg with identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// - /// \param[in] msg The message to be written. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, void* tag) = 0; -}; - -template -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface {}; - -template -class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { - public: - /// Create a stream and write the first request out. - template - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet - init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template -class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { - public: - template - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet - finish_ops_; -}; - -/// Client-side interface for asynchronous bi-directional streaming. -template -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template -class ClientAsyncReaderWriter GRPC_FINAL - : public ClientAsyncReaderWriterInterface { - public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet finish_ops_; -}; - -template -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { - public: - virtual void Finish(const W& msg, const Status& status, void* tag) = 0; - - virtual void FinishWithError(const Status& status, void* tag) = 0; -}; - -template -class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { - public: - explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); - } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_ops_); - } - - void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet - finish_ops_; -}; - -template -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { - public: - virtual void Finish(const Status& status, void* tag) = 0; -}; - -template -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { - public: - explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - virtual void Finish(const Status& status, void* tag) = 0; -}; - -template -class ServerAsyncReaderWriter GRPC_FINAL - : public ServerAsyncReaderWriterInterface { - public: - explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class ::grpc::Server; - - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; -}; - -} // namespace grpc - -#endif // GRPCXX_IMPL_CODEGEN_IMPL_ASYNC_STREAM_H diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 21ac6c4fb5..2f4be644ba 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -65,6 +65,9 @@ class RpcMethodHandler : public MethodHandler { ops; ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } if (status.ok()) { status = ops.SendMessage(rsp); } @@ -104,6 +107,9 @@ class ClientStreamingHandler : public MethodHandler { ops; ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } if (status.ok()) { status = ops.SendMessage(rsp); } @@ -144,6 +150,9 @@ class ServerStreamingHandler : public MethodHandler { if (!param.server_context->sent_initial_metadata_) { ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -177,6 +186,9 @@ class BidiStreamingHandler : public MethodHandler { if (!param.server_context->sent_initial_metadata_) { ops.SendInitialMetadata(param.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); + if (param.server_context->compression_level_set()) { + ops.set_compression_level(param.server_context->compression_level()); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -199,6 +211,9 @@ class UnknownMethodHandler : public MethodHandler { if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); + if (context->compression_level_set()) { + ops->set_compression_level(context->compression_level()); + } context->sent_initial_metadata_ = true; } ops->ServerSendStatus(context->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index cea13a513f..08212af861 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -130,7 +130,13 @@ class ServerContext { grpc_compression_level compression_level() const { return compression_level_; } - void set_compression_level(grpc_compression_level level); + + void set_compression_level(grpc_compression_level level) { + compression_level_set_ = true; + compression_level_ = level; + } + + bool compression_level_set() const { return compression_level_set_; } grpc_compression_algorithm compression_algorithm() const { return compression_algorithm_; @@ -217,6 +223,7 @@ class ServerContext { std::multimap initial_metadata_; std::multimap trailing_metadata_; + bool compression_level_set_; grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; }; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index cbfa410699..b2b972760d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -347,6 +347,9 @@ class ServerReader GRPC_FINAL : public ReaderInterface { CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -375,6 +378,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface { CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -389,6 +395,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface { if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); @@ -413,6 +422,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; call_->PerformOps(&ops); call_->cq()->Pluck(&ops); @@ -434,6 +446,9 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops.set_compression_level(ctx_->compression_level()); + } ctx_->sent_initial_metadata_ = true; } call_->PerformOps(&ops); -- cgit v1.2.3