diff options
author | 2017-10-13 16:07:13 -0700 | |
---|---|---|
committer | 2017-10-18 17:12:19 -0700 | |
commit | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (patch) | |
tree | e43d5de442fdcc3d39cd5af687f319fa39612d3f /test/core/util/passthru_endpoint.cc | |
parent | 6bf5f833efe2cb9e2ecc14358dd9699cd5d05263 (diff) |
Removing instances of exec_ctx being passed around in functions in
src/core. exec_ctx is now a thread_local pointer of type ExecCtx instead of
grpc_exec_ctx which is initialized whenever ExecCtx is instantiated. ExecCtx
also keeps track of the previous exec_ctx so that nesting of exec_ctx is
allowed. This means that there is only one exec_ctx being used at any
time. Also, grpc_exec_ctx_finish is called in the destructor of the
object, and the previous exec_ctx is restored to avoid breaking current
functionality. The code still explicitly calls grpc_exec_ctx_finish
because removing all such instances causes the code to break.
Diffstat (limited to 'test/core/util/passthru_endpoint.cc')
-rw-r--r-- | test/core/util/passthru_endpoint.cc | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc new file mode 100644 index 0000000000..4fcae991eb --- /dev/null +++ b/test/core/util/passthru_endpoint.cc @@ -0,0 +1,194 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + +#include "test/core/util/passthru_endpoint.h" + +#include <inttypes.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> +#include "src/core/lib/iomgr/sockaddr.h" + +#include "src/core/lib/slice/slice_internal.h" + +typedef struct passthru_endpoint passthru_endpoint; + +typedef struct { + grpc_endpoint base; + passthru_endpoint *parent; + grpc_slice_buffer read_buffer; + grpc_slice_buffer *on_read_out; + grpc_closure *on_read; + grpc_resource_user *resource_user; +} half; + +struct passthru_endpoint { + gpr_mu mu; + int halves; + grpc_passthru_endpoint_stats *stats; + grpc_passthru_endpoint_stats + dummy_stats; // used if constructor stats == NULL + bool shutdown; + half client; + half server; +}; + +static void me_read(grpc_endpoint *ep, grpc_slice_buffer *slices, + grpc_closure *cb) { + half *m = (half *)ep; + gpr_mu_lock(&m->parent->mu); + if (m->parent->shutdown) { + GRPC_CLOSURE_SCHED( + cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); + } else if (m->read_buffer.count > 0) { + grpc_slice_buffer_swap(&m->read_buffer, slices); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); + } else { + m->on_read = cb; + m->on_read_out = slices; + } + gpr_mu_unlock(&m->parent->mu); +} + +static half *other_half(half *h) { + if (h == &h->parent->client) return &h->parent->server; + return &h->parent->client; +} + +static void me_write(grpc_endpoint *ep, grpc_slice_buffer *slices, + grpc_closure *cb) { + half *m = other_half((half *)ep); + gpr_mu_lock(&m->parent->mu); + grpc_error *error = GRPC_ERROR_NONE; + m->parent->stats->num_writes++; + if (m->parent->shutdown) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); + } else if (m->on_read != NULL) { + for (size_t i = 0; i < slices->count; i++) { + grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i])); + } + GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE); + m->on_read = NULL; + } else { + for (size_t i = 0; i < slices->count; i++) { + grpc_slice_buffer_add(&m->read_buffer, + grpc_slice_copy(slices->slices[i])); + } + } + gpr_mu_unlock(&m->parent->mu); + GRPC_CLOSURE_SCHED(cb, error); +} + +static void me_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {} + +static void me_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + +static void me_shutdown(grpc_endpoint *ep, grpc_error *why) { + half *m = (half *)ep; + gpr_mu_lock(&m->parent->mu); + m->parent->shutdown = true; + if (m->on_read) { + GRPC_CLOSURE_SCHED( + m->on_read, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1)); + m->on_read = NULL; + } + m = other_half(m); + if (m->on_read) { + GRPC_CLOSURE_SCHED( + m->on_read, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1)); + m->on_read = NULL; + } + gpr_mu_unlock(&m->parent->mu); + grpc_resource_user_shutdown(m->resource_user); + GRPC_ERROR_UNREF(why); +} + +static void me_destroy(grpc_endpoint *ep) { + passthru_endpoint *p = ((half *)ep)->parent; + gpr_mu_lock(&p->mu); + if (0 == --p->halves) { + gpr_mu_unlock(&p->mu); + gpr_mu_destroy(&p->mu); + grpc_slice_buffer_destroy_internal(&p->client.read_buffer); + grpc_slice_buffer_destroy_internal(&p->server.read_buffer); + grpc_resource_user_unref(p->client.resource_user); + grpc_resource_user_unref(p->server.resource_user); + gpr_free(p); + } else { + gpr_mu_unlock(&p->mu); + } +} + +static char *me_get_peer(grpc_endpoint *ep) { + passthru_endpoint *p = ((half *)ep)->parent; + return ((half *)ep) == &p->client ? gpr_strdup("fake:mock_client_endpoint") + : gpr_strdup("fake:mock_server_endpoint"); +} + +static int me_get_fd(grpc_endpoint *ep) { return -1; } + +static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) { + half *m = (half *)ep; + return m->resource_user; +} + +static const grpc_endpoint_vtable vtable = { + me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, + me_shutdown, me_destroy, me_get_resource_user, me_get_peer, + me_get_fd, +}; + +static void half_init(half *m, passthru_endpoint *parent, + grpc_resource_quota *resource_quota, + const char *half_name) { + m->base.vtable = &vtable; + m->parent = parent; + grpc_slice_buffer_init(&m->read_buffer); + m->on_read = NULL; + char *name; + gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name, + (intptr_t)parent); + m->resource_user = grpc_resource_user_create(resource_quota, name); + gpr_free(name); +} + +void grpc_passthru_endpoint_create(grpc_endpoint **client, + grpc_endpoint **server, + grpc_resource_quota *resource_quota, + grpc_passthru_endpoint_stats *stats) { + passthru_endpoint *m = (passthru_endpoint *)gpr_malloc(sizeof(*m)); + m->halves = 2; + m->shutdown = 0; + m->stats = stats == NULL ? &m->dummy_stats : stats; + memset(m->stats, 0, sizeof(*m->stats)); + half_init(&m->client, m, resource_quota, "client"); + half_init(&m->server, m, resource_quota, "server"); + gpr_mu_init(&m->mu); + *client = &m->client.base; + *server = &m->server.base; +} |