aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-03-24 11:12:47 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-03-24 11:12:47 -0700
commitc732abb08f5b3dfc4af956095927725afffaf6d2 (patch)
treea712bfc93bd5049e04fdc8ad3c718fd37efba053 /src/core/lib/transport
parent1bfcc40fd32357018cb7bfd729e069018abb3689 (diff)
parentaf61d0b7b4b9cd48b424c3c6f7c0b3c3be2ae270 (diff)
Merge remote-tracking branch 'upstream/master' into lazy-deframe
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r--src/core/lib/transport/connectivity_state.c3
-rw-r--r--src/core/lib/transport/error_utils.c34
-rw-r--r--src/core/lib/transport/error_utils.h2
-rw-r--r--src/core/lib/transport/metadata_batch.c13
-rw-r--r--src/core/lib/transport/service_config.c12
-rw-r--r--src/core/lib/transport/service_config.h6
-rw-r--r--src/core/lib/transport/transport.c47
-rw-r--r--src/core/lib/transport/transport.h12
-rw-r--r--src/core/lib/transport/transport_impl.h5
9 files changed, 99 insertions, 35 deletions
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index afe1f6164d..3757b25267 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -79,7 +79,8 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
*w->current = GRPC_CHANNEL_SHUTDOWN;
error = GRPC_ERROR_NONE;
} else {
- error = GRPC_ERROR_CREATE("Shutdown connectivity owner");
+ error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
}
grpc_closure_sched(exec_ctx, w->notify, error);
gpr_free(w);
diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c
index da77828d9c..4e70f8749d 100644
--- a/src/core/lib/transport/error_utils.c
+++ b/src/core/lib/transport/error_utils.c
@@ -44,18 +44,18 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error,
}
if (grpc_error_is_special(error)) return NULL;
// Otherwise, search through its children.
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- grpc_error *result = recursively_find_error_with_field(child_error, which);
- if (result != NULL) return result;
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ grpc_error *result = recursively_find_error_with_field(lerr->err, which);
+ if (result) return result;
+ slot = lerr->next;
}
return NULL;
}
void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
- grpc_status_code *code, const char **msg,
+ grpc_status_code *code, grpc_slice *slice,
grpc_http2_error_code *http_error) {
// Start with the parent error and recurse through the tree of children
// until we find the first one that has a status code.
@@ -97,11 +97,11 @@ void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
// If the error has a status message, use it. Otherwise, fall back to
// the error description.
- if (msg != NULL) {
- *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE);
- if (*msg == NULL && error != GRPC_ERROR_NONE) {
- *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION);
- if (*msg == NULL) *msg = "unknown error"; // Just in case.
+ if (slice != NULL) {
+ if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE, slice)) {
+ if (!grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION, slice)) {
+ *slice = grpc_slice_from_static_string("unknown error");
+ }
}
}
@@ -112,13 +112,13 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error) {
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
return true;
}
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- if (grpc_error_has_clear_grpc_status(child_error)) {
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ if (grpc_error_has_clear_grpc_status(lerr->err)) {
return true;
}
+ slot = lerr->next;
}
return false;
}
diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h
index 105338880a..3b44466ab8 100644
--- a/src/core/lib/transport/error_utils.h
+++ b/src/core/lib/transport/error_utils.h
@@ -44,7 +44,7 @@
/// attributes (code, msg, http_status) are unneeded, they can be passed as
/// NULL.
void grpc_error_get_status(grpc_error *error, gpr_timespec deadline,
- grpc_status_code *code, const char **msg,
+ grpc_status_code *code, grpc_slice *slice,
grpc_http2_error_code *http_status);
/// A utility function to check whether there is a clear status code that
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index fc2c52bd8a..fa73244aa4 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -101,12 +101,10 @@ void grpc_metadata_batch_destroy(grpc_exec_ctx *exec_ctx,
}
grpc_error *grpc_attach_md_to_error(grpc_error *src, grpc_mdelem md) {
- char *k = grpc_slice_to_c_string(GRPC_MDKEY(md));
- char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md));
grpc_error *out = grpc_error_set_str(
- grpc_error_set_str(src, GRPC_ERROR_STR_KEY, k), GRPC_ERROR_STR_VALUE, v);
- gpr_free(k);
- gpr_free(v);
+ grpc_error_set_str(src, GRPC_ERROR_STR_KEY,
+ grpc_slice_ref_internal(GRPC_MDKEY(md))),
+ GRPC_ERROR_STR_VALUE, grpc_slice_ref_internal(GRPC_MDVALUE(md)));
return out;
}
@@ -126,7 +124,8 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch,
return GRPC_ERROR_NONE;
}
return grpc_attach_md_to_error(
- GRPC_ERROR_CREATE("Unallowed duplicate metadata"), storage->md);
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unallowed duplicate metadata"),
+ storage->md);
}
static void maybe_unlink_callout(grpc_metadata_batch *batch,
@@ -302,7 +301,7 @@ static void add_error(grpc_error **composite, grpc_error *error,
const char *composite_error_string) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
- *composite = GRPC_ERROR_CREATE(composite_error_string);
+ *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(composite_error_string);
}
*composite = grpc_error_add_child(*composite, error);
}
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 12da2a88fe..1195f75044 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -93,6 +93,18 @@ void grpc_service_config_destroy(grpc_service_config* service_config) {
gpr_free(service_config);
}
+void grpc_service_config_parse_global_params(
+ const grpc_service_config* service_config,
+ void (*process_json)(const grpc_json* json, void* arg), void* arg) {
+ const grpc_json* json = service_config->json_tree;
+ if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) return;
+ if (strcmp(field->key, "methodConfig") == 0) continue;
+ process_json(field, arg);
+ }
+}
+
const char* grpc_service_config_get_lb_policy_name(
const grpc_service_config* service_config) {
const grpc_json* json = service_config->json_tree;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index cd739a593c..ebfc59b534 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -42,6 +42,12 @@ typedef struct grpc_service_config grpc_service_config;
grpc_service_config* grpc_service_config_create(const char* json_string);
void grpc_service_config_destroy(grpc_service_config* service_config);
+/// Invokes \a process_json() for each global parameter in the service
+/// config. \a arg is passed as the second argument to \a process_json().
+void grpc_service_config_parse_global_params(
+ const grpc_service_config* service_config,
+ void (*process_json)(const grpc_json* json, void* arg), void* arg);
+
/// Gets the LB policy name from \a service_config.
/// Returns NULL if no LB policy name was specified.
/// Caller does NOT take ownership.
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 004e748f25..d56cb31ee0 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -84,6 +84,39 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
+#define STREAM_REF_FROM_SLICE_REF(p) \
+ ((grpc_stream_refcount *)(((uint8_t *)p) - \
+ offsetof(grpc_stream_refcount, slice_refcount)))
+
+static void slice_stream_ref(void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length) {
+ slice_stream_ref(&refcount->slice_refcount);
+ return (grpc_slice){.refcount = &refcount->slice_refcount,
+ .data.refcounted = {.bytes = buffer, .length = length}};
+}
+
+static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
+ .ref = slice_stream_ref,
+ .unref = slice_stream_unref,
+ .eq = grpc_slice_default_eq_impl,
+ .hash = grpc_slice_default_hash_impl};
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg,
@@ -95,6 +128,8 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
#endif
gpr_ref_init(&refcount->refs, initial_refs);
grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
+ refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
+ refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
}
static void move64(uint64_t *from, uint64_t *to) {
@@ -127,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
- server_data);
+ server_data, arena);
}
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
@@ -162,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream, void *and_free_memory) {
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure) {
transport->vtable->destroy_stream(exec_ctx, transport, stream,
- and_free_memory);
+ then_schedule_closure);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
@@ -219,8 +255,9 @@ typedef struct {
static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_stream_op *op = arg;
- grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
+ grpc_closure *c = op->inner_on_complete;
gpr_free(op);
+ grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op *grpc_make_transport_stream_op(
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index bb23c0225a..950b18aeda 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/metadata_batch.h"
@@ -64,6 +65,7 @@ typedef struct grpc_stream_refcount {
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
const char *object_type;
#endif
+ grpc_slice_refcount slice_refcount;
} grpc_stream_refcount;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -84,6 +86,11 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
+/* Wrap a buffer that is owned by some stream object into a slice that shares
+ the same refcount */
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length);
+
typedef struct {
uint64_t framing_bytes;
uint64_t data_bytes;
@@ -223,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
- const void *server_data);
+ const void *server_data, gpr_arena *arena);
void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
grpc_stream *stream, grpc_polling_entity *pollent);
@@ -240,7 +247,8 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream, void *and_free_memory);
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 8553148c35..6f688bf8d2 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_stream_refcount *refcount,
- const void *server_data);
+ const void *server_data, gpr_arena *arena);
/* implementation of grpc_transport_set_pollset */
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
@@ -67,7 +67,8 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream, void *and_free_memory);
+ grpc_stream *stream,
+ grpc_closure *then_schedule_closure);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);