diff options
author | 2017-10-13 16:07:13 -0700 | |
---|---|---|
committer | 2017-10-18 17:12:19 -0700 | |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /test/core/util/port_server_client.cc | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'test/core/util/port_server_client.cc')
-rw-r--r-- | test/core/util/port_server_client.cc | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/test/core/util/port_server_client.cc b/test/core/util/port_server_client.cc new file mode 100644 index 0000000000..253a1df29e --- /dev/null +++ b/test/core/util/port_server_client.cc @@ -0,0 +1,247 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> +#include "test/core/util/test_config.h" + +#ifdef GRPC_TEST_PICK_PORT +#include "test/core/util/port_server_client.h" + +#include <math.h> +#include <string.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#include "src/core/lib/http/httpcli.h" + +typedef struct freereq { + gpr_mu *mu; + grpc_polling_entity pops; + int done; +} freereq; + +static void destroy_pops_and_shutdown(void *p, grpc_error *error) { + grpc_pollset *pollset = grpc_polling_entity_pollset((grpc_polling_entity *)p); + grpc_pollset_destroy(pollset); + gpr_free(pollset); +} + +static void freed_port_from_server(void *arg, grpc_error *error) { + freereq *pr = (freereq *)arg; + gpr_mu_lock(pr->mu); + pr->done = 1; + GRPC_LOG_IF_ERROR( + "pollset_kick", + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), NULL)); + gpr_mu_unlock(pr->mu); +} + +void grpc_free_port_using_server(int port) { + grpc_httpcli_context context; + grpc_httpcli_request req; + grpc_httpcli_response rsp; + freereq pr; + char *path; + ExecCtx _local_exec_ctx; + grpc_closure *shutdown_closure; + + grpc_init(); + + memset(&pr, 0, sizeof(pr)); + memset(&req, 0, sizeof(req)); + memset(&rsp, 0, sizeof(rsp)); + + grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(pollset, &pr.mu); + pr.pops = grpc_polling_entity_create_from_pollset(pollset); + shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, + grpc_schedule_on_exec_ctx); + + req.host = (char *)GRPC_PORT_SERVER_ADDRESS; + gpr_asprintf(&path, "/drop/%d", port); + req.http.path = path; + + grpc_httpcli_context_init(&context); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("port_server_client/free"); + grpc_httpcli_get(&context, &pr.pops, resource_quota, &req, + grpc_exec_ctx_now() + 30 * GPR_MS_PER_SEC, + GRPC_CLOSURE_CREATE(freed_port_from_server, &pr, + grpc_schedule_on_exec_ctx), + &rsp); + grpc_resource_quota_unref_internal(resource_quota); + grpc_exec_ctx_flush(); + gpr_mu_lock(pr.mu); + while (!pr.done) { + grpc_pollset_worker *worker = NULL; + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker, + grpc_exec_ctx_now() + GPR_MS_PER_SEC))) { + pr.done = 1; + } + } + gpr_mu_unlock(pr.mu); + + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops), + shutdown_closure); + grpc_exec_ctx_finish(); + gpr_free(path); + grpc_http_response_destroy(&rsp); + + grpc_shutdown(); +} + +typedef struct portreq { + gpr_mu *mu; + grpc_polling_entity pops; + int port; + int retries; + char *server; + grpc_httpcli_context *ctx; + grpc_httpcli_response response; +} portreq; + +static void got_port_from_server(void *arg, grpc_error *error) { + size_t i; + int port = 0; + portreq *pr = (portreq *)arg; + int failed = 0; + grpc_httpcli_response *response = &pr->response; + + if (error != GRPC_ERROR_NONE) { + failed = 1; + const char *msg = grpc_error_string(error); + gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]", msg); + + } else if (response->status != 200) { + failed = 1; + gpr_log(GPR_DEBUG, "failed port pick from server: status=%d", + response->status); + } + + if (failed) { + grpc_httpcli_request req; + memset(&req, 0, sizeof(req)); + if (pr->retries >= 5) { + gpr_mu_lock(pr->mu); + pr->port = 0; + GRPC_LOG_IF_ERROR( + "pollset_kick", + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), NULL)); + gpr_mu_unlock(pr->mu); + return; + } + GPR_ASSERT(pr->retries < 10); + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + (int64_t)(1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)), + GPR_TIMESPAN))); + pr->retries++; + req.host = pr->server; + req.http.path = (char *)"/get"; + grpc_http_response_destroy(&pr->response); + memset(&pr->response, 0, sizeof(pr->response)); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("port_server_client/pick_retry"); + grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req, + grpc_exec_ctx_now() + 30 * GPR_MS_PER_SEC, + GRPC_CLOSURE_CREATE(got_port_from_server, pr, + grpc_schedule_on_exec_ctx), + &pr->response); + grpc_resource_quota_unref_internal(resource_quota); + return; + } + GPR_ASSERT(response); + GPR_ASSERT(response->status == 200); + for (i = 0; i < response->body_length; i++) { + GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9'); + port = port * 10 + response->body[i] - '0'; + } + GPR_ASSERT(port > 1024); + gpr_mu_lock(pr->mu); + pr->port = port; + GRPC_LOG_IF_ERROR( + "pollset_kick", + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), NULL)); + gpr_mu_unlock(pr->mu); +} + +int grpc_pick_port_using_server(void) { + grpc_httpcli_context context; + grpc_httpcli_request req; + portreq pr; + ExecCtx _local_exec_ctx; + grpc_closure *shutdown_closure; + + grpc_init(); + + memset(&pr, 0, sizeof(pr)); + memset(&req, 0, sizeof(req)); + grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(pollset, &pr.mu); + pr.pops = grpc_polling_entity_create_from_pollset(pollset); + shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, + grpc_schedule_on_exec_ctx); + pr.port = -1; + pr.server = (char *)GRPC_PORT_SERVER_ADDRESS; + pr.ctx = &context; + + req.host = (char *)GRPC_PORT_SERVER_ADDRESS; + req.http.path = (char *)"/get"; + + grpc_httpcli_context_init(&context); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("port_server_client/pick"); + grpc_httpcli_get( + &context, &pr.pops, resource_quota, &req, + grpc_exec_ctx_now() + 30 * GPR_MS_PER_SEC, + GRPC_CLOSURE_CREATE(got_port_from_server, &pr, grpc_schedule_on_exec_ctx), + &pr.response); + grpc_resource_quota_unref_internal(resource_quota); + grpc_exec_ctx_flush(); + gpr_mu_lock(pr.mu); + while (pr.port == -1) { + grpc_pollset_worker *worker = NULL; + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker, + grpc_exec_ctx_now() + GPR_MS_PER_SEC))) { + pr.port = 0; + } + } + gpr_mu_unlock(pr.mu); + + grpc_http_response_destroy(&pr.response); + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops), + shutdown_closure); + grpc_exec_ctx_finish(); + grpc_shutdown(); + + return pr.port; +} + +#endif // GRPC_TEST_PICK_PORT |