aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/error.c40
-rw-r--r--src/core/lib/iomgr/error_internal.h15
-rw-r--r--src/core/lib/iomgr/pollset_uv.c22
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c52
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c1
-rw-r--r--src/core/lib/iomgr/udp_server.c16
-rw-r--r--src/core/lib/iomgr/udp_server.h12
-rw-r--r--src/core/lib/support/arena.c98
-rw-r--r--src/core/lib/support/arena.h54
-rw-r--r--src/core/lib/surface/call.c121
-rw-r--r--src/core/lib/transport/transport.c3
11 files changed, 304 insertions, 130 deletions
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 7cdbe30198..1127fff756 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -140,14 +140,16 @@ grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
const char *func) {
if (grpc_error_is_special(err)) return err;
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err,
- err->refs.count, err->refs.count + 1, file, line, func);
- gpr_ref(&err->refs);
+ gpr_atm_no_barrier_load(&err->atomics.refs.count),
+ gpr_atm_no_barrier_load(&err->atomics.refs.count) + 1, file, line,
+ func);
+ gpr_ref(&err->atomics.refs);
return err;
}
#else
grpc_error *grpc_error_ref(grpc_error *err) {
if (grpc_error_is_special(err)) return err;
- gpr_ref(&err->refs);
+ gpr_ref(&err->atomics.refs);
return err;
}
#endif
@@ -182,7 +184,7 @@ static void error_destroy(grpc_error *err) {
GPR_ASSERT(!grpc_error_is_special(err));
unref_errs(err);
unref_strs(err);
- gpr_free((void *)gpr_atm_acq_load(&err->error_string));
+ gpr_free((void *)gpr_atm_acq_load(&err->atomics.error_string));
gpr_free(err);
}
@@ -191,15 +193,17 @@ void grpc_error_unref(grpc_error *err, const char *file, int line,
const char *func) {
if (grpc_error_is_special(err)) return;
gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err,
- err->refs.count, err->refs.count - 1, file, line, func);
- if (gpr_unref(&err->refs)) {
+ gpr_atm_no_barrier_load(&err->atomics.refs.count),
+ gpr_atm_no_barrier_load(&err->atomics.refs.count) - 1, file, line,
+ func);
+ if (gpr_unref(&err->atomics.refs)) {
error_destroy(err);
}
}
#else
void grpc_error_unref(grpc_error *err) {
if (grpc_error_is_special(err)) return;
- if (gpr_unref(&err->refs)) {
+ if (gpr_unref(&err->atomics.refs)) {
error_destroy(err);
}
}
@@ -328,8 +332,8 @@ grpc_error *grpc_error_create(const char *file, int line, const char *desc,
internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME));
- gpr_atm_no_barrier_store(&err->error_string, 0);
- gpr_ref_init(&err->refs, 1);
+ gpr_atm_no_barrier_store(&err->atomics.error_string, 0);
+ gpr_ref_init(&err->atomics.refs, 1);
GPR_TIMER_END("grpc_error_create", 0);
return err;
}
@@ -369,7 +373,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
grpc_slice_from_static_string("cancelled"));
internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
}
- } else if (gpr_ref_is_unique(&in->refs)) {
+ } else if (gpr_ref_is_unique(&in->atomics.refs)) {
out = in;
} else {
uint8_t new_arena_capacity = in->arena_capacity;
@@ -382,10 +386,14 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
- memcpy(out, in, sizeof(*in) + in->arena_size * sizeof(intptr_t));
+ // bulk memcpy of the rest of the struct.
+ size_t skip = sizeof(&out->atomics);
+ memcpy((void *)((uintptr_t)out + skip), (void *)((uintptr_t)in + skip),
+ sizeof(*in) + (in->arena_size * sizeof(intptr_t)) - skip);
+ // manually set the atomics and the new capacity
+ gpr_atm_no_barrier_store(&out->atomics.error_string, 0);
+ gpr_ref_init(&out->atomics.refs, 1);
out->arena_capacity = new_arena_capacity;
- gpr_atm_no_barrier_store(&out->error_string, 0);
- gpr_ref_init(&out->refs, 1);
ref_strs(out);
ref_errs(out);
GRPC_ERROR_UNREF(in);
@@ -692,7 +700,7 @@ const char *grpc_error_string(grpc_error *err) {
if (err == GRPC_ERROR_OOM) return oom_error_string;
if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string;
- void *p = (void *)gpr_atm_acq_load(&err->error_string);
+ void *p = (void *)gpr_atm_acq_load(&err->atomics.error_string);
if (p != NULL) {
GPR_TIMER_END("grpc_error_string", 0);
return p;
@@ -712,9 +720,9 @@ const char *grpc_error_string(grpc_error *err) {
char *out = finish_kvs(&kvs);
- if (!gpr_atm_rel_cas(&err->error_string, 0, (gpr_atm)out)) {
+ if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) {
gpr_free(out);
- out = (char *)gpr_atm_no_barrier_load(&err->error_string);
+ out = (char *)gpr_atm_no_barrier_load(&err->atomics.error_string);
}
GPR_TIMER_END("grpc_error_string", 0);
diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h
index fb4814e41f..7f204df1b2 100644
--- a/src/core/lib/iomgr/error_internal.h
+++ b/src/core/lib/iomgr/error_internal.h
@@ -46,14 +46,25 @@ struct grpc_linked_error {
uint8_t next;
};
+// c core representation of an error. See error.h for high level description of
+// this object.
struct grpc_error {
- gpr_refcount refs;
+ // All atomics in grpc_error must be stored in this nested struct. The rest of
+ // the object is memcpy-ed in bulk in copy_and_unref.
+ struct atomics {
+ gpr_refcount refs;
+ gpr_atm error_string;
+ } atomics;
+ // These arrays index into dynamic arena at the bottom of the struct.
+ // UINT8_MAX is used as a sentinel value.
uint8_t ints[GRPC_ERROR_INT_MAX];
uint8_t strs[GRPC_ERROR_STR_MAX];
uint8_t times[GRPC_ERROR_TIME_MAX];
+ // The child errors are stored in the arena, but are effectively a linked list
+ // structure, since they are contained withing grpc_linked_error objects.
uint8_t first_err;
uint8_t last_err;
- gpr_atm error_string;
+ // The arena is dynamically reallocated with a grow factor of 1.5.
uint8_t arena_size;
uint8_t arena_capacity;
intptr_t arena[0];
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index af33949c69..a2f81bcd78 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -39,6 +39,7 @@
#include <string.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -61,25 +62,30 @@ gpr_mu grpc_polling_mu;
immediately in the next loop iteration.
Note: In the future, if there is a bug that involves missing wakeups in the
future, try adding a uv_async_t to kick the loop differently */
-uv_timer_t dummy_uv_handle;
+uv_timer_t *dummy_uv_handle;
size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
void dummy_timer_cb(uv_timer_t *handle) {}
+void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); }
+
void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
- uv_timer_init(uv_default_loop(), &dummy_uv_handle);
+ dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t));
+ uv_timer_init(uv_default_loop(), dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}
-static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
-
void grpc_pollset_global_shutdown(void) {
gpr_mu_destroy(&grpc_polling_mu);
- uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb);
+ uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
}
+static void timer_run_cb(uv_timer_t *timer) {}
+
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
+
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
uv_timer_init(uv_default_loop(), &pollset->timer);
@@ -95,7 +101,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
} else {
// kick the loop once
- uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
@@ -111,8 +117,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
}
}
-static void timer_run_cb(uv_timer_t *timer) {}
-
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
@@ -145,7 +149,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
+ uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 79ff910738..4d715be94c 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -40,6 +40,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
@@ -54,8 +55,36 @@ typedef struct request {
grpc_closure *on_done;
grpc_resolved_addresses **addresses;
struct addrinfo *hints;
+ char *host;
+ char *port;
} request;
+static int retry_named_port_failure(int status, request *r,
+ uv_getaddrinfo_cb getaddrinfo_cb) {
+ if (status != 0) {
+ // This loop is copied from resolve_address_posix.c
+ char *svc[][2] = {{"http", "80"}, {"https", "443"}};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
+ if (strcmp(r->port, svc[i][0]) == 0) {
+ int retry_status;
+ uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+ retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
+ r->host, svc[i][1], r->hints);
+ if (retry_status < 0 || getaddrinfo_cb == NULL) {
+ // The callback will not be called
+ gpr_free(req);
+ }
+ return retry_status;
+ }
+ }
+ }
+ /* If this function calls uv_getaddrinfo, it will return that function's
+ return value. That function only returns numbers <=0, so we can safely
+ return 1 to indicate that we never retried */
+ return 1;
+}
+
static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result,
grpc_resolved_addresses **addresses) {
struct addrinfo *resp;
@@ -97,13 +126,21 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
request *r = (request *)req->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
+ int retry_status;
+
+ gpr_free(req);
+ retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
+ if (retry_status == 0) {
+ // The request is being retried. Nothing should be done here
+ return;
+ }
+ /* Either no retry was attempted, or the retry failed. Either way, the
+ original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
-
gpr_free(r->hints);
gpr_free(r);
- gpr_free(req);
uv_freeaddrinfo(res);
}
@@ -143,6 +180,7 @@ static grpc_error *blocking_resolve_address_impl(
uv_getaddrinfo_t req;
int s;
grpc_error *err;
+ int retry_status;
req.addrinfo = NULL;
@@ -158,6 +196,12 @@ static grpc_error *blocking_resolve_address_impl(
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ request r = {
+ .addresses = addresses, .hints = &hints, .host = host, .port = port};
+ retry_status = retry_named_port_failure(s, &r, NULL);
+ if (retry_status <= 0) {
+ s = retry_status;
+ }
err = handle_addrinfo_result(s, req.addrinfo, addresses);
done:
@@ -200,6 +244,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
r = gpr_malloc(sizeof(request));
r->on_done = on_done;
r->addresses = addrs;
+ r->host = host;
+ r->port = port;
req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
@@ -222,6 +268,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
gpr_free(r);
gpr_free(req);
gpr_free(hints);
+ gpr_free(host);
+ gpr_free(port);
}
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index ae66577caf..618483d9cb 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -76,7 +76,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
connect->addr_name, str);
- grpc_error_free_string(str);
}
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index d1bcd89af1..71e295770a 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -109,8 +109,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
- /* The parent grpc server */
- grpc_server *grpc_server;
+ /* opaque object to pass to callbacks */
+ void *user_data;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd);
+ sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
@@ -204,7 +204,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd);
+ sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
@@ -299,7 +299,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
- sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
+ sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@@ -322,7 +322,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb);
- sp->write_cb(exec_ctx, sp->emfd);
+ sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
/* Re-arm the notification event so we get another chance to write. */
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
@@ -464,13 +464,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_pollset **pollsets, size_t pollset_count,
- grpc_server *server) {
+ void *user_data) {
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener *sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
- s->grpc_server = server;
+ s->user_data = user_data;
sp = s->head;
while (sp != NULL) {
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index ed63fa7d81..90842a47f0 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -47,23 +47,23 @@ typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
- struct grpc_server *server);
+ void *user_data);
/* Called when the socket is writeable. */
-typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx,
- grpc_fd *emfd);
+typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
+ void *user_data);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx,
- grpc_fd *emfd);
+ grpc_fd *emfd, void *user_data);
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
-/* Start listening to bound ports */
+/* Start listening to bound ports. user_data is passed to callbacks. */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server,
grpc_pollset **pollsets, size_t pollset_count,
- struct grpc_server *server);
+ void *user_data);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index);
diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c
new file mode 100644
index 0000000000..7bcb983f24
--- /dev/null
+++ b/src/core/lib/support/arena.c
@@ -0,0 +1,98 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/support/arena.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
+ (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
+
+typedef struct zone {
+ size_t size_begin;
+ size_t size_end;
+ gpr_atm next_atm;
+} zone;
+
+struct gpr_arena {
+ gpr_atm size_so_far;
+ zone initial_zone;
+};
+
+gpr_arena *gpr_arena_create(size_t initial_size) {
+ initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size);
+ gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size);
+ a->initial_zone.size_end = initial_size;
+ return a;
+}
+
+size_t gpr_arena_destroy(gpr_arena *arena) {
+ gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far);
+ zone *z = (zone *)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm);
+ gpr_free(arena);
+ while (z) {
+ zone *next_z = (zone *)gpr_atm_no_barrier_load(&z->next_atm);
+ gpr_free(z);
+ z = next_z;
+ }
+ return (size_t)size;
+}
+
+void *gpr_arena_alloc(gpr_arena *arena, size_t size) {
+ size = ROUND_UP_TO_ALIGNMENT_SIZE(size);
+ size_t start =
+ (size_t)gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size);
+ zone *z = &arena->initial_zone;
+ while (start > z->size_end) {
+ zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm);
+ if (next_z == NULL) {
+ size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far);
+ next_z = gpr_zalloc(sizeof(zone) + next_z_size);
+ next_z->size_begin = z->size_end;
+ next_z->size_end = z->size_end + next_z_size;
+ if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) {
+ gpr_free(next_z);
+ next_z = (zone *)gpr_atm_acq_load(&z->next_atm);
+ }
+ }
+ z = next_z;
+ }
+ if (start + size > z->size_end) {
+ return gpr_arena_alloc(arena, size);
+ }
+ GPR_ASSERT(start >= z->size_begin);
+ GPR_ASSERT(start + size <= z->size_end);
+ return ((char *)(z + 1)) + start - z->size_begin;
+}
diff --git a/src/core/lib/support/arena.h b/src/core/lib/support/arena.h
new file mode 100644
index 0000000000..c28033ffc3
--- /dev/null
+++ b/src/core/lib/support/arena.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// \file Arena based allocator
+// Allows very fast allocation of memory, but that memory cannot be freed until
+// the arena as a whole is freed
+// Tracks the total memory allocated against it, so that future arenas can
+// pre-allocate the right amount of memory
+
+#ifndef GRPC_CORE_LIB_SUPPORT_ARENA_H
+#define GRPC_CORE_LIB_SUPPORT_ARENA_H
+
+#include <stddef.h>
+
+typedef struct gpr_arena gpr_arena;
+
+// Create an arena, with \a initial_size bytes in the first allocated buffer
+gpr_arena *gpr_arena_create(size_t initial_size);
+// Allocate \a size bytes from the arena
+void *gpr_arena_alloc(gpr_arena *arena, size_t size);
+// Destroy an arena, returning the total number of bytes allocated
+size_t gpr_arena_destroy(gpr_arena *arena);
+
+#endif /* GRPC_CORE_LIB_SUPPORT_ARENA_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index cf4410f2c0..bd34ed1b91 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -139,8 +139,8 @@ struct grpc_call {
grpc_call *parent;
grpc_call *first_child;
gpr_timespec start_time;
- /* TODO(ctiller): share with cq if possible? */
- gpr_mu mu;
+ /* protects first_child, and child next/prev links */
+ gpr_mu child_list_mu;
/* client or server call */
bool is_client;
@@ -155,8 +155,8 @@ struct grpc_call {
bool received_initial_metadata;
bool receiving_message;
bool requested_final_op;
- bool received_final_op;
- bool sent_any_op;
+ gpr_atm any_ops_sent_atm;
+ gpr_atm received_final_op_atm;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -271,7 +271,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("grpc_call_create", 0);
call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size);
*out_call = call;
- gpr_mu_init(&call->mu);
+ gpr_mu_init(&call->child_list_mu);
call->channel = args->channel;
call->cq = args->cq;
call->parent = args->parent_call;
@@ -310,7 +310,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent_call->is_client);
- gpr_mu_lock(&args->parent_call->mu);
+ gpr_mu_lock(&args->parent_call->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
@@ -338,6 +338,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
+ if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
+ cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
+ }
}
if (args->parent_call->first_child == NULL) {
@@ -350,7 +354,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
call;
}
- gpr_mu_unlock(&args->parent_call->mu);
+ gpr_mu_unlock(&args->parent_call->child_list_mu);
}
call->send_deadline = send_deadline;
@@ -432,7 +436,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- gpr_mu_destroy(&c->mu);
+ gpr_mu_destroy(&c->child_list_mu);
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
}
@@ -453,7 +457,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
GRPC_ERROR_UNREF(
- unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
+ unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
@@ -470,7 +474,7 @@ void grpc_call_destroy(grpc_call *c) {
GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
if (parent) {
- gpr_mu_lock(&parent->mu);
+ gpr_mu_lock(&parent->child_list_mu);
if (c == parent->first_child) {
parent->first_child = c->sibling_next;
if (c == parent->first_child) {
@@ -479,15 +483,14 @@ void grpc_call_destroy(grpc_call *c) {
c->sibling_prev->sibling_next = c->sibling_next;
c->sibling_next->sibling_prev = c->sibling_prev;
}
- gpr_mu_unlock(&parent->mu);
+ gpr_mu_unlock(&parent->child_list_mu);
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
}
- gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = c->sent_any_op && !c->received_final_op;
- gpr_mu_unlock(&c->mu);
+ cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) &&
+ !gpr_atm_acq_load(&c->received_final_op_atm);
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
@@ -551,56 +554,26 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == NULL);
- gpr_mu_lock(&c->mu);
cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
description);
- gpr_mu_unlock(&c->mu);
grpc_exec_ctx_finish(&exec_ctx);
return GRPC_CALL_OK;
}
-typedef struct termination_closure {
- grpc_closure closure;
- grpc_call *call;
- grpc_transport_stream_op op;
- grpc_transport_stream_op_payload payload;
-} termination_closure;
-
-static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
- grpc_error *error) {
- termination_closure *tc = tcp;
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination");
- gpr_free(tc);
-}
-
-static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
+static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- termination_closure *tc = tcp;
- memset(&tc->op, 0, sizeof(tc->op));
- tc->op.payload = &tc->payload;
- tc->op.cancel_stream = true;
- tc->op.payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
- /* reuse closure to catch completion */
- tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
- grpc_schedule_on_exec_ctx);
- execute_op(exec_ctx, tc->call, &tc->op);
-}
-
-static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
- grpc_error *error) {
- termination_closure *tc = gpr_malloc(sizeof(*tc));
- memset(tc, 0, sizeof(*tc));
- tc->call = c;
- GRPC_CALL_INTERNAL_REF(tc->call, "termination");
- grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination,
- tc, grpc_schedule_on_exec_ctx),
- error);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
+ GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- terminate_with_error(exec_ctx, c, error);
+ grpc_transport_stream_op *op = grpc_make_transport_stream_op(
+ grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
+ op->cancel_stream = true;
+ op->payload->cancel_stream.cancel_error = error;
+ execute_op(exec_ctx, c, op);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -714,9 +687,7 @@ static void set_incoming_compression_algorithm(
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
- gpr_mu_lock(&call->mu);
algorithm = call->incoming_compression_algorithm;
- gpr_mu_unlock(&call->mu);
return algorithm;
}
@@ -728,9 +699,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
- gpr_mu_lock(&call->mu);
flags = call->test_only_last_message_flags;
- gpr_mu_unlock(&call->mu);
return flags;
}
@@ -784,9 +753,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
uint32_t encodings_accepted_by_peer;
- gpr_mu_lock(&call->mu);
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
- gpr_mu_unlock(&call->mu);
return encodings_accepted_by_peer;
}
@@ -1033,16 +1000,13 @@ static batch_control *allocate_batch_control(grpc_call *call,
const grpc_op *ops,
size_t num_ops) {
int slot = batch_slot_for_op(ops[0].op);
- for (size_t i = 1; i < num_ops; i++) {
- int op_slot = batch_slot_for_op(ops[i].op);
- slot = GPR_MIN(slot, op_slot);
- }
batch_control *bctl = &call->active_batches[slot];
if (bctl->call != NULL) {
return NULL;
}
memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
+ bctl->op.payload = &call->stream_op_payload;
return bctl;
}
@@ -1055,7 +1019,7 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
}
static grpc_error *consolidate_batch_errors(batch_control *bctl) {
- size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors);
+ size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors);
if (n == 0) {
return GRPC_ERROR_NONE;
} else if (n == 1) {
@@ -1082,8 +1046,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
grpc_call *call = bctl->call;
grpc_error *error = consolidate_batch_errors(bctl);
- gpr_mu_lock(&call->mu);
-
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
@@ -1102,20 +1064,23 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
- call->received_final_op = true;
/* propagate cancellation to any interested children */
+ gpr_atm_rel_store(&call->received_final_op_atm, 1);
+ gpr_mu_lock(&call->child_list_mu);
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- grpc_call_cancel(child_call, NULL);
+ cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
+ gpr_mu_unlock(&call->child_list_mu);
if (call->is_client) {
get_final_status(call, set_status_value_directly,
@@ -1129,7 +1094,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
- gpr_mu_unlock(&call->mu);
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
@@ -1221,7 +1185,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
@@ -1233,11 +1196,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
- gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
- gpr_mu_unlock(&bctl->call->mu);
}
}
@@ -1296,7 +1257,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
- int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
+ int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
@@ -1309,8 +1270,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- gpr_mu_lock(&call->mu);
-
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1336,11 +1295,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
grpc_schedule_on_exec_ctx);
call->saved_receiving_stream_ready_bctlp = NULL;
- grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
+ grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
- gpr_mu_unlock(&call->mu);
-
finish_batch_step(exec_ctx, bctl);
}
@@ -1394,11 +1351,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->completion_data.notify_tag.is_closure =
(uint8_t)(is_notify_tag_closure != 0);
- gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
grpc_transport_stream_op_payload *stream_op_payload =
&call->stream_op_payload;
- memset(stream_op, 0, sizeof(*stream_op));
stream_op->covered_by_poller = true;
/* rewrite batch ops into a transport op */
@@ -1513,7 +1468,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
stream_op->send_trailing_metadata = true;
call->sent_final_op = 1;
- stream_op->send_trailing_metadata =
+ stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1686,8 +1641,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
- call->sent_any_op = true;
- gpr_mu_unlock(&call->mu);
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_op(exec_ctx, call, stream_op);
@@ -1718,7 +1672,6 @@ done_with_error:
if (stream_op->recv_trailing_metadata) {
call->requested_final_op = 0;
}
- gpr_mu_unlock(&call->mu);
goto done;
}
@@ -1767,10 +1720,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call *call, grpc_compression_level level) {
- gpr_mu_lock(&call->mu);
grpc_compression_algorithm algo =
compression_algorithm_for_level_locked(call, level);
- gpr_mu_unlock(&call->mu);
return algo;
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 3b0d18c789..ce06b4d004 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -264,8 +264,9 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
+ grpc_closure *c = op->inner_on_complete;
gpr_free(op);
+ grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op *grpc_make_transport_stream_op(