aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/client_context.h15
-rw-r--r--include/grpc++/server_context.h14
-rw-r--r--include/grpc/compression.h8
-rw-r--r--src/core/channel/compress_filter.c19
-rw-r--r--src/core/channel/compress_filter.h2
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/channel.c2
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--src/core/surface/secure_channel_create.c5
-rw-r--r--src/cpp/client/client_context.cc21
-rw-r--r--src/cpp/proto/proto_utils.cc4
-rw-r--r--src/cpp/server/server_context.cc20
-rw-r--r--test/core/end2end/tests/request_with_compressed_payload.c6
-rw-r--r--test/cpp/end2end/end2end_test.cc3
-rw-r--r--test/cpp/end2end/generic_end2end_test.cc1
15 files changed, 106 insertions, 18 deletions
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 5e10875260..88954e227b 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -38,6 +38,7 @@
#include <memory>
#include <string>
+#include <grpc/compression.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc++/config.h>
@@ -107,6 +108,17 @@ class ClientContext {
creds_ = creds;
}
+ grpc_compression_level get_compression_level() const {
+ return compression_level_;
+ }
+ void set_compression_level(grpc_compression_level level);
+
+ grpc_compression_algorithm get_compression_algorithm() const {
+ return compression_algorithm_;
+ }
+ void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
+
void TryCancel();
private:
@@ -157,6 +169,9 @@ class ClientContext {
std::multimap<grpc::string, grpc::string> send_initial_metadata_;
std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+ grpc_compression_level compression_level_;
+ grpc_compression_algorithm compression_algorithm_;
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 326b6a125c..a2f0a2f990 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -36,6 +36,7 @@
#include <map>
+#include <grpc/compression.h>
#include <grpc/support/time.h>
#include <grpc++/config.h>
#include <grpc++/time.h>
@@ -97,6 +98,16 @@ class ServerContext {
return client_metadata_;
}
+ grpc_compression_level get_compression_level() const {
+ return compression_level_;
+ }
+ void set_compression_level(grpc_compression_level level);
+
+ grpc_compression_algorithm get_compression_algorithm() const {
+ return compression_algorithm_;
+ }
+ void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
private:
friend class ::grpc::Server;
template <class W, class R>
@@ -142,6 +153,9 @@ class ServerContext {
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+ grpc_compression_level compression_level_;
+ grpc_compression_algorithm compression_algorithm_;
};
} // namespace grpc
diff --git a/include/grpc/compression.h b/include/grpc/compression.h
index 1cff5d2d7e..dd7e1d0a12 100644
--- a/include/grpc/compression.h
+++ b/include/grpc/compression.h
@@ -34,6 +34,10 @@
#ifndef GRPC_COMPRESSION_H
#define GRPC_COMPRESSION_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/** To be used in channel arguments */
#define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level"
@@ -76,4 +80,8 @@ grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_COMPRESSION_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index f5fe87d6b8..6100a90668 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -50,7 +50,8 @@ typedef struct call_data {
} call_data;
typedef struct channel_data {
- grpc_mdstr *mdstr_compression_algorithm_key;
+ grpc_mdstr *mdstr_request_compression_algorithm_key;
+ grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@@ -72,14 +73,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
}
/** For each \a md element from the incoming metadata, filter out the entry for
- * "grpc-compression-algorithm", using its value to populate the call data's
+ * "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- if (md->key == channeld->mdstr_compression_algorithm_key) {
+ if (md->key == channeld->mdstr_request_compression_algorithm_key) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str,
&calld->compression_algorithm)) {
@@ -184,7 +185,6 @@ static void process_send_ops(grpc_call_element *elem,
break;
case GRPC_OP_SLICE:
if (did_compress) {
- gpr_slice_unref(sop->data.slice);
if (j < calld->slices.count) {
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
}
@@ -259,7 +259,10 @@ static void init_channel_elem(grpc_channel_element *elem,
channeld->default_compression_algorithm =
grpc_compression_algorithm_for_level(clevel);
- channeld->mdstr_compression_algorithm_key =
+ channeld->mdstr_request_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+
+ channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
@@ -267,7 +270,8 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
+ mdctx,
+ grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorith_name));
}
@@ -283,7 +287,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index ea667969e1..3a196eb7bf 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -36,6 +36,8 @@
#include "src/core/channel/channel_stack.h"
+#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request"
+
/** Message-level compression filter.
*
* See <grpc/compression.h> for the available compression levels.
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 37dadecb35..5f489c0f4e 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1243,7 +1243,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key ==
- grpc_channel_get_compresssion_algorithm_string(call->channel)) {
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3dcb2255f..cab99e71d3 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -273,7 +273,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel) {
return channel->grpc_compression_algorithm_string;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 8d0fe812ce..66924ad72c 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index be46c54427..cfa869ec71 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -244,10 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
- if (grpc_channel_args_get_compression_level(args) >
- GRPC_COMPRESS_LEVEL_NONE) {
- filters[n++] = &grpc_compress_filter;
- }
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 72cdd49d19..0eba554e33 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -34,9 +34,12 @@
#include <grpc++/client_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
#include <grpc++/time.h>
+#include "src/core/channel/compress_filter.h"
+
namespace grpc {
ClientContext::ClientContext()
@@ -75,6 +78,24 @@ void ClientContext::set_call(grpc_call* call,
}
}
+void ClientContext::set_compression_level(grpc_compression_level level) {
+ const grpc_compression_algorithm algorithm_for_level =
+ grpc_compression_algorithm_for_level(level);
+ set_compression_algorithm(algorithm_for_level);
+}
+
+void ClientContext::set_compression_algorithm(
+ grpc_compression_algorithm algorithm) {
+ char* algorithm_name = NULL;
+ if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+ gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+ algorithm);
+ abort();
+ }
+ GPR_ASSERT(algorithm_name != NULL);
+ AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
void ClientContext::TryCancel() {
if (call_) {
grpc_call_cancel(call_);
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 268e4f6d1f..337f680129 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -103,7 +103,9 @@ class GrpcBufferReader GRPC_FINAL
: byte_count_(0), backup_count_(0) {
grpc_byte_buffer_reader_init(&reader_, buffer);
}
- ~GrpcBufferReader() GRPC_OVERRIDE {}
+ ~GrpcBufferReader() GRPC_OVERRIDE {
+ grpc_byte_buffer_reader_destroy(&reader_);
+ }
bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 699895a3cf..087e28d33a 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -39,6 +39,8 @@
#include <grpc++/impl/sync.h>
#include <grpc++/time.h>
+#include "src/core/channel/compress_filter.h"
+
namespace grpc {
// CompletionOp
@@ -146,4 +148,22 @@ bool ServerContext::IsCancelled() {
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
+void ServerContext::set_compression_level(grpc_compression_level level) {
+ const grpc_compression_algorithm algorithm_for_level =
+ grpc_compression_algorithm_for_level(level);
+ set_compression_algorithm(algorithm_for_level);
+}
+
+void ServerContext::set_compression_algorithm(
+ grpc_compression_algorithm algorithm) {
+ char* algorithm_name = NULL;
+ if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+ gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+ algorithm);
+ abort();
+ }
+ GPR_ASSERT(algorithm_name != NULL);
+ AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
} // namespace grpc
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index ca16bc7d52..a6057457c4 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -45,6 +45,7 @@
#include "test/core/end2end/cq_verifier.h"
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/compress_filter.h"
enum { TIMEOUT = 200000 };
@@ -240,6 +241,7 @@ static void request_with_payload_template(
cq_verifier_destroy(cqv);
+ gpr_slice_unref(request_payload_slice);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
@@ -279,13 +281,13 @@ static void test_invoke_request_with_compressed_payload_md_override(
grpc_metadata gzip_compression_override;
grpc_metadata none_compression_override;
- gzip_compression_override.key = "grpc-encoding";
+ gzip_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
gzip_compression_override.value = "gzip";
gzip_compression_override.value_length = 4;
memset(&gzip_compression_override.internal_data, 0,
sizeof(gzip_compression_override.internal_data));
- none_compression_override.key = "grpc-encoding";
+ none_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
none_compression_override.value = "none";
none_compression_override.value_length = 4;
memset(&none_compression_override.internal_data, 0,
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 45ba8b0878..49070a7df1 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -226,10 +226,11 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
int num_rpcs) {
EchoRequest request;
EchoResponse response;
- request.set_message("Hello");
+ request.set_message("Hello hello hello hello");
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
+ context.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index b9d47b32de..e9d86cc9f7 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -227,6 +227,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
+ cli_ctx.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
send_request.set_message("Hello");
std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));