diff options
author | David Garcia Quintas <dgq@google.com> | 2015-08-24 12:11:26 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-08-24 12:11:26 -0700 |
commit | d09bae5b488cc235b11d4c2446accaf3160322c7 (patch) | |
tree | 8851e37b9123b8a97fab8aab8976276f5ee00768 /src | |
parent | c43648f250dd6cb0f086e2366e468372a6de26ae (diff) | |
parent | cb30410b1063845cc3a0205741a72c9c96e07638 (diff) |
Merge branch 'compression-accept-encoding' of github.com:dgquintas/grpc into compression-accept-encoding
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/channel_args.c | 48 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 9 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 64 |
3 files changed, 51 insertions, 70 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 7d97b79553..55b4cb6170 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -148,44 +148,58 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( return grpc_channel_args_copy_and_add(a, &tmp, 1); } -/** Returns the compression algorithm's enabled states bitset from \a a. If not - * found, return a biset will all algorithms enabled */ -static gpr_uint32 find_compression_algorithm_states_bitset( - const grpc_channel_args *a) { - gpr_uint32 states_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; +/** Returns 1 if the argument for compression algorithm's enabled states bitset + * was found in \a a, returning the arg's value in \a states. Otherwise, returns + * 0. */ +static int find_compression_algorithm_states_bitset( + const grpc_channel_args *a, int **states_arg) { if (a != NULL) { size_t i; for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { - states_bitset = a->args[i].value.integer; - break; + *states_arg = &a->args[i].value.integer; + return 1; /* GPR_TRUE */ } } } - return states_bitset; + return 0; /* GPR_FALSE */ } grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( grpc_channel_args *a, grpc_compression_algorithm algorithm, int state) { - gpr_uint32 states_bitset = find_compression_algorithm_states_bitset(a); - grpc_arg tmp; + int *states_arg; + grpc_channel_args *result = a; + const int states_arg_found = + find_compression_algorithm_states_bitset(a, &states_arg); + + if (!states_arg_found) { + /* create a new arg */ + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + states_arg = &tmp.value.integer; + result = grpc_channel_args_copy_and_add(a, &tmp, 1); + } + /* update either the new arg's value or the already present one */ if (state != 0) { - GPR_BITSET(&states_bitset, algorithm); + GPR_BITSET(states_arg, algorithm); } else { - GPR_BITCLEAR(&states_bitset, algorithm); + GPR_BITCLEAR(states_arg, algorithm); } - tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; - tmp.value.integer = states_bitset; - return grpc_channel_args_copy_and_add(a, &tmp, 1); + return result; } int grpc_channel_args_compression_algorithm_get_states( const grpc_channel_args *a) { - return find_compression_algorithm_states_bitset(a); + int *states_arg; + if (find_compression_algorithm_states_bitset(a, &states_arg)) { + return *states_arg; + } else { + return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ + } } diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 8e9ab30ceb..39efc08ab5 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -306,6 +306,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; + size_t supported_algorithms_idx = 0; char *accept_encoding_str; size_t accept_encoding_str_len; @@ -344,15 +345,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); if (algo_idx > 0) { - supported_algorithms_names[algo_idx - 1] = algorithm_name; + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = gpr_strjoin_sep( - supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), - ", ", &accept_encoding_str_len); + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, + ", ", &accept_encoding_str_len); channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index d8ea375636..eb2a984993 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -90,26 +90,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, - gpr_timespec deadline) { - void* tag = nullptr; - *ok = false; - switch (cq->AsyncNext(&tag, ok, deadline)) { - case CompletionQueue::TIMEOUT: - *req = nullptr; - return true; - case CompletionQueue::SHUTDOWN: - *req = nullptr; - return false; - case CompletionQueue::GOT_EVENT: - *req = static_cast<SyncRequest*>(tag); - GPR_ASSERT((*req)->in_flight_); - return true; - } - gpr_log(GPR_ERROR, "Should never reach here"); - abort(); - } - void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -207,21 +187,22 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { static grpc_server* CreateServer( int max_message_size, const grpc_compression_options& compression_options) { + grpc_arg args[2]; + size_t args_idx = 0; if (max_message_size > 0) { - grpc_arg args[2]; - args[0].type = GRPC_ARG_INTEGER; - args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); - args[0].value.integer = max_message_size; - - args[1].type = GRPC_ARG_INTEGER; - args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); - args[1].value.integer = compression_options.enabled_algorithms_bitset; - - grpc_channel_args channel_args = {2, args}; - return grpc_server_create(&channel_args, NULL); - } else { - return grpc_server_create(nullptr, nullptr); + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + args[args_idx].value.integer = max_message_size; + args_idx++; } + + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); + args[args_idx].value.integer = compression_options.enabled_algorithms_bitset; + args_idx++; + + grpc_channel_args channel_args = {args_idx, args}; + return grpc_server_create(&channel_args, nullptr); } Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, @@ -330,27 +311,12 @@ bool Server::Start() { return true; } -void Server::ShutdownInternal(gpr_timespec deadline) { +void Server::Shutdown() { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); - // Spin, eating requests until the completion queue is completely shutdown. - // If the deadline expires then cancel anything that's pending and keep - // spinning forever until the work is actually drained. - // Since nothing else needs to touch state guarded by mu_, holding it - // through this loop is fine. - SyncRequest* request; - bool ok; - while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { - if (request == NULL) { // deadline expired - grpc_server_cancel_all_calls(server_); - deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - } else if (ok) { - SyncRequest::CallData call_data(this, request); - } - } // Wait for running callbacks to finish. while (num_running_cb_ != 0) { |