aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-08-24 12:11:26 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-08-24 12:11:26 -0700
commitd09bae5b488cc235b11d4c2446accaf3160322c7 (patch)
tree8851e37b9123b8a97fab8aab8976276f5ee00768 /src
parentc43648f250dd6cb0f086e2366e468372a6de26ae (diff)
parentcb30410b1063845cc3a0205741a72c9c96e07638 (diff)
Merge branch 'compression-accept-encoding' of github.com:dgquintas/grpc into compression-accept-encoding
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/channel_args.c48
-rw-r--r--src/core/channel/compress_filter.c9
-rw-r--r--src/cpp/server/server.cc64
3 files changed, 51 insertions, 70 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 7d97b79553..55b4cb6170 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -148,44 +148,58 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
-/** Returns the compression algorithm's enabled states bitset from \a a. If not
- * found, return a biset will all algorithms enabled */
-static gpr_uint32 find_compression_algorithm_states_bitset(
- const grpc_channel_args *a) {
- gpr_uint32 states_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
+/** Returns 1 if the argument for compression algorithm's enabled states bitset
+ * was found in \a a, returning the arg's value in \a states. Otherwise, returns
+ * 0. */
+static int find_compression_algorithm_states_bitset(
+ const grpc_channel_args *a, int **states_arg) {
if (a != NULL) {
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
- states_bitset = a->args[i].value.integer;
- break;
+ *states_arg = &a->args[i].value.integer;
+ return 1; /* GPR_TRUE */
}
}
}
- return states_bitset;
+ return 0; /* GPR_FALSE */
}
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args *a,
grpc_compression_algorithm algorithm,
int state) {
- gpr_uint32 states_bitset = find_compression_algorithm_states_bitset(a);
- grpc_arg tmp;
+ int *states_arg;
+ grpc_channel_args *result = a;
+ const int states_arg_found =
+ find_compression_algorithm_states_bitset(a, &states_arg);
+
+ if (!states_arg_found) {
+ /* create a new arg */
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ states_arg = &tmp.value.integer;
+ result = grpc_channel_args_copy_and_add(a, &tmp, 1);
+ }
+ /* update either the new arg's value or the already present one */
if (state != 0) {
- GPR_BITSET(&states_bitset, algorithm);
+ GPR_BITSET(states_arg, algorithm);
} else {
- GPR_BITCLEAR(&states_bitset, algorithm);
+ GPR_BITCLEAR(states_arg, algorithm);
}
- tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
- tmp.value.integer = states_bitset;
- return grpc_channel_args_copy_and_add(a, &tmp, 1);
+ return result;
}
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a) {
- return find_compression_algorithm_states_bitset(a);
+ int *states_arg;
+ if (find_compression_algorithm_states_bitset(a, &states_arg)) {
+ return *states_arg;
+ } else {
+ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
+ }
}
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 8e9ab30ceb..39efc08ab5 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -306,6 +306,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
+ size_t supported_algorithms_idx = 0;
char *accept_encoding_str;
size_t accept_encoding_str_len;
@@ -344,15 +345,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
if (algo_idx > 0) {
- supported_algorithms_names[algo_idx - 1] = algorithm_name;
+ supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
}
}
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
- accept_encoding_str = gpr_strjoin_sep(
- supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names),
- ", ", &accept_encoding_str_len);
+ accept_encoding_str =
+ gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx,
+ ", ", &accept_encoding_str_len);
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index d8ea375636..eb2a984993 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -90,26 +90,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
- gpr_timespec deadline) {
- void* tag = nullptr;
- *ok = false;
- switch (cq->AsyncNext(&tag, ok, deadline)) {
- case CompletionQueue::TIMEOUT:
- *req = nullptr;
- return true;
- case CompletionQueue::SHUTDOWN:
- *req = nullptr;
- return false;
- case CompletionQueue::GOT_EVENT:
- *req = static_cast<SyncRequest*>(tag);
- GPR_ASSERT((*req)->in_flight_);
- return true;
- }
- gpr_log(GPR_ERROR, "Should never reach here");
- abort();
- }
-
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -207,21 +187,22 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
static grpc_server* CreateServer(
int max_message_size, const grpc_compression_options& compression_options) {
+ grpc_arg args[2];
+ size_t args_idx = 0;
if (max_message_size > 0) {
- grpc_arg args[2];
- args[0].type = GRPC_ARG_INTEGER;
- args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
- args[0].value.integer = max_message_size;
-
- args[1].type = GRPC_ARG_INTEGER;
- args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
- args[1].value.integer = compression_options.enabled_algorithms_bitset;
-
- grpc_channel_args channel_args = {2, args};
- return grpc_server_create(&channel_args, NULL);
- } else {
- return grpc_server_create(nullptr, nullptr);
+ args[args_idx].type = GRPC_ARG_INTEGER;
+ args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
+ args[args_idx].value.integer = max_message_size;
+ args_idx++;
}
+
+ args[args_idx].type = GRPC_ARG_INTEGER;
+ args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
+ args[args_idx].value.integer = compression_options.enabled_algorithms_bitset;
+ args_idx++;
+
+ grpc_channel_args channel_args = {args_idx, args};
+ return grpc_server_create(&channel_args, nullptr);
}
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
@@ -330,27 +311,12 @@ bool Server::Start() {
return true;
}
-void Server::ShutdownInternal(gpr_timespec deadline) {
+void Server::Shutdown() {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
- // Spin, eating requests until the completion queue is completely shutdown.
- // If the deadline expires then cancel anything that's pending and keep
- // spinning forever until the work is actually drained.
- // Since nothing else needs to touch state guarded by mu_, holding it
- // through this loop is fine.
- SyncRequest* request;
- bool ok;
- while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
- if (request == NULL) { // deadline expired
- grpc_server_cancel_all_calls(server_);
- deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- } else if (ok) {
- SyncRequest::CallData call_data(this, request);
- }
- }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {