aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-12-02 13:23:33 -0800
committerGravatar yang-g <yangg@google.com>2015-12-02 13:23:33 -0800
commitd88e1d8f4e42764e17aa2bed3a85b39f68673c03 (patch)
tree57cf6860d1c67257ccedc8b762f744e9e4df4a69 /src/core/surface
parent768999d0a351e2fd0a691d6391ff767228135123 (diff)
parent9fac2afdafe1355b02a355b06955b21d2a2cd004 (diff)
merge with head and resolve conflict
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_reader.c1
-rw-r--r--src/core/surface/call.c62
-rw-r--r--src/core/surface/channel.c115
-rw-r--r--src/core/surface/channel.h11
-rw-r--r--src/core/surface/channel_create.c39
-rw-r--r--src/core/surface/init.c2
-rw-r--r--src/core/surface/lame_client.c12
-rw-r--r--src/core/surface/secure_channel_create.c44
-rw-r--r--src/core/surface/server.c21
-rw-r--r--src/core/surface/server.h2
-rw-r--r--src/core/surface/server_chttp2.c13
11 files changed, 124 insertions, 198 deletions
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 9f830df68c..57417f41b0 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -121,4 +121,3 @@ gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
}
return out_slice;
}
-
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index aa435d44d3..4affafa585 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -43,6 +43,7 @@
#include <grpc/support/useful.h>
#include "src/core/channel/channel_stack.h"
+#include "src/core/compression/algorithm_metadata.h"
#include "src/core/iomgr/timer.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
@@ -50,6 +51,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
+#include "src/core/transport/static_metadata.h"
/** The maximum number of concurrent batches possible.
Based upon the maximum number of individually queueable ops in the batch
@@ -135,7 +137,6 @@ struct grpc_call {
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
- grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@@ -267,7 +268,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
}
call->send_deadline = send_deadline;
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
- call->metadata_context = grpc_channel_get_metadata_context(channel);
/* initial refcount dropped by grpc_call_destroy */
grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
call->context, server_transport_data,
@@ -567,9 +567,8 @@ static int prepare_application_metadata(grpc_call *call, int count,
grpc_metadata *md = &metadata[i];
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
- l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
- (const gpr_uint8 *)md->value,
- md->value_length);
+ l->md = grpc_mdelem_from_string_and_buffer(
+ md->key, (const gpr_uint8 *)md->value, md->value_length);
if (!grpc_mdstr_is_legal_header(l->md->key)) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
grpc_mdstr_as_c_string(l->md->key));
@@ -712,8 +711,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
grpc_mdstr *details =
- description ? grpc_mdstr_from_string(c->metadata_context, description)
- : NULL;
+ description ? grpc_mdstr_from_string(description) : NULL;
cancel_closure *cc = gpr_malloc(sizeof(*cc));
GPR_ASSERT(status != GRPC_STATUS_OK);
@@ -788,8 +786,12 @@ static void destroy_status(void *ignored) {}
static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
- if (user_data) {
+ void *user_data;
+ if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0;
+ if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1;
+ if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2;
+ user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data != NULL) {
status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
@@ -803,38 +805,23 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-/* just as for status above, we need to offset: metadata userdata can't hold a
- * zero (null), which in this case is used to signal no compression */
-#define COMPRESS_OFFSET 1
-static void destroy_compression(void *ignored) {}
-
static gpr_uint32 decode_compression(grpc_mdelem *md) {
- grpc_compression_algorithm algorithm;
- void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
- if (user_data) {
- algorithm =
- ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET;
- } else {
+ grpc_compression_algorithm algorithm =
+ grpc_compression_algorithm_from_mdstr(md->value);
+ if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
- &algorithm)) {
- gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
- assert(0);
- }
- grpc_mdelem_set_user_data(
- md, destroy_compression,
- (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
+ gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
}
return algorithm;
}
static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) {
- if (elem->key == grpc_channel_get_status_string(call->channel)) {
+ if (elem->key == GRPC_MDSTR_GRPC_STATUS) {
GPR_TIMER_BEGIN("status", 0);
set_status_code(call, STATUS_FROM_WIRE, decode_status(elem));
GPR_TIMER_END("status", 0);
return NULL;
- } else if (elem->key == grpc_channel_get_message_string(call->channel)) {
+ } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) {
GPR_TIMER_BEGIN("status-details", 0);
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value));
GPR_TIMER_END("status-details", 0);
@@ -867,14 +854,12 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
elem = recv_common_filter(call, elem);
if (elem == NULL) {
return NULL;
- } else if (elem->key ==
- grpc_channel_get_compression_algorithm_string(call->channel)) {
+ } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
GPR_TIMER_BEGIN("compression_algorithm", 0);
set_compression_algorithm(call, decode_compression(elem));
GPR_TIMER_END("compression_algorithm", 0);
return NULL;
- } else if (elem->key == grpc_channel_get_encodings_accepted_by_peer_string(
- call->channel)) {
+ } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
set_encodings_accepted_by_peer(call, elem);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
@@ -922,11 +907,12 @@ static batch_control *allocate_batch_control(grpc_call *call) {
size_t i;
for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
if ((call->used_batches & (1 << i)) == 0) {
- call->used_batches |= (gpr_uint8)(1 << i);
+ call->used_batches =
+ (gpr_uint8)(call->used_batches | (gpr_uint8)(1 << i));
return &call->active_batches[i];
}
}
- GPR_UNREACHABLE_CODE(return NULL);
+ return NULL;
}
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -1240,10 +1226,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->channel, op->data.send_status_from_server.status);
if (op->data.send_status_from_server.status_details != NULL) {
call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- GRPC_MDSTR_REF(grpc_channel_get_message_string(call->channel)),
+ GRPC_MDSTR_GRPC_MESSAGE,
grpc_mdstr_from_string(
- call->metadata_context,
op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
set_status_details(
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a9a5f828f2..859197412b 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -46,6 +46,7 @@
#include "src/core/surface/api_trace.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
+#include "src/core/transport/static_metadata.h"
/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
* Avoids needing to take a metadata context lock for sending status
@@ -64,17 +65,7 @@ struct grpc_channel {
int is_client;
gpr_refcount refs;
gpr_uint32 max_message_length;
- grpc_mdctx *metadata_context;
- /** mdstr for the grpc-status key */
- grpc_mdstr *grpc_status_string;
- grpc_mdstr *grpc_compression_algorithm_string;
- grpc_mdstr *grpc_encodings_accepted_by_peer_string;
- grpc_mdstr *grpc_message_string;
- grpc_mdstr *path_string;
- grpc_mdstr *authority_string;
grpc_mdelem *default_authority;
- /** mdelem for grpc-status: 0 thru grpc-status: 2 */
- grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS];
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -93,7 +84,7 @@ struct grpc_channel {
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_filter **filters, size_t num_filters,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ const grpc_channel_args *args, int is_client) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -104,22 +95,6 @@ grpc_channel *grpc_channel_create_from_filters(
channel->is_client = is_client;
/* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1);
- channel->metadata_context = mdctx;
- channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
- channel->grpc_compression_algorithm_string =
- grpc_mdstr_from_string(mdctx, "grpc-encoding");
- channel->grpc_encodings_accepted_by_peer_string =
- grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
- channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
- for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
- char buf[GPR_LTOA_MIN_BUFSIZE];
- gpr_ltoa((long)i, buf);
- channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
- mdctx, GRPC_MDSTR_REF(channel->grpc_status_string),
- grpc_mdstr_from_string(mdctx, buf));
- }
- channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
- channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -146,7 +121,7 @@ grpc_channel *grpc_channel_create_from_filters(
GRPC_MDELEM_UNREF(channel->default_authority);
}
channel->default_authority = grpc_mdelem_from_strings(
- mdctx, ":authority", args->args[i].value.string);
+ ":authority", args->args[i].value.string);
}
} else if (0 ==
strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
@@ -160,7 +135,7 @@ grpc_channel *grpc_channel_create_from_filters(
GRPC_ARG_DEFAULT_AUTHORITY);
} else {
channel->default_authority = grpc_mdelem_from_strings(
- mdctx, ":authority", args->args[i].value.string);
+ ":authority", args->args[i].value.string);
}
}
}
@@ -171,14 +146,13 @@ grpc_channel *grpc_channel_create_from_filters(
target != NULL) {
char *default_authority = grpc_get_default_authority(target);
if (default_authority) {
- channel->default_authority = grpc_mdelem_from_strings(
- channel->metadata_context, ":authority", default_authority);
+ channel->default_authority =
+ grpc_mdelem_from_strings(":authority", default_authority);
}
gpr_free(default_authority);
}
grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args,
- channel->metadata_context,
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
@@ -227,13 +201,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq,
- grpc_mdelem_from_metadata_strings(
- channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
- grpc_mdstr_from_string(channel->metadata_context, method)),
- host ? grpc_mdelem_from_metadata_strings(
- channel->metadata_context,
- GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host))
+ grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH,
+ grpc_mdstr_from_string(method)),
+ host ? grpc_mdelem_from_metadata_strings(GRPC_MDSTR_AUTHORITY,
+ grpc_mdstr_from_string(host))
: NULL,
deadline);
}
@@ -245,15 +216,11 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
"grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)",
4, (channel, method, host, reserved));
GPR_ASSERT(!reserved);
- rc->path = grpc_mdelem_from_metadata_strings(
- channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
- grpc_mdstr_from_string(channel->metadata_context, method));
- rc->authority =
- host ? grpc_mdelem_from_metadata_strings(
- channel->metadata_context,
- GRPC_MDSTR_REF(channel->authority_string),
- grpc_mdstr_from_string(channel->metadata_context, host))
- : NULL;
+ rc->path = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH,
+ grpc_mdstr_from_string(method));
+ rc->authority = host ? grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_AUTHORITY, grpc_mdstr_from_string(host))
+ : NULL;
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
channel->registered_calls = rc;
@@ -293,17 +260,7 @@ void grpc_channel_internal_ref(grpc_channel *c) {
}
static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
- size_t i;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
- for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
- GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]);
- }
- GRPC_MDSTR_UNREF(channel->grpc_status_string);
- GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
- GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string);
- GRPC_MDSTR_UNREF(channel->grpc_message_string);
- GRPC_MDSTR_UNREF(channel->path_string);
- GRPC_MDSTR_UNREF(channel->authority_string);
while (channel->registered_calls) {
registered_call *rc = channel->registered_calls;
channel->registered_calls = rc->next;
@@ -316,7 +273,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
if (channel->default_authority != NULL) {
GRPC_MDELEM_UNREF(channel->default_authority);
}
- grpc_mdctx_unref(channel->metadata_context);
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
@@ -355,38 +311,19 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
}
-grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel) {
- return channel->metadata_context;
-}
-
-grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
- return channel->grpc_status_string;
-}
-
-grpc_mdstr *grpc_channel_get_compression_algorithm_string(
- grpc_channel *channel) {
- return channel->grpc_compression_algorithm_string;
-}
-
-grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
- grpc_channel *channel) {
- return channel->grpc_encodings_accepted_by_peer_string;
-}
-
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
- if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
- return GRPC_MDELEM_REF(channel->grpc_status_elem[i]);
- } else {
- char tmp[GPR_LTOA_MIN_BUFSIZE];
- gpr_ltoa(i, tmp);
- return grpc_mdelem_from_metadata_strings(
- channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string),
- grpc_mdstr_from_string(channel->metadata_context, tmp));
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ switch (i) {
+ case 0:
+ return GRPC_MDELEM_GRPC_STATUS_0;
+ case 1:
+ return GRPC_MDELEM_GRPC_STATUS_1;
+ case 2:
+ return GRPC_MDELEM_GRPC_STATUS_2;
}
-}
-
-grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
- return channel->grpc_message_string;
+ gpr_ltoa(i, tmp);
+ return grpc_mdelem_from_metadata_strings(GRPC_MDSTR_GRPC_STATUS,
+ grpc_mdstr_from_string(tmp));
}
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index e5030d52d2..7dea609ebc 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,26 +40,17 @@
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_filter **filters, size_t count,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+ const grpc_channel_args *args, int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
-/** Get a (borrowed) pointer to the channel wide metadata context */
-grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
-
/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
status_code.
The returned elem is owned by the caller. */
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
-grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compression_algorithm_string(
- grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
- grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 51d9130b63..aceb932742 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -37,6 +37,8 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
@@ -56,11 +58,11 @@ typedef struct {
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
+ grpc_closure initial_string_sent;
+ gpr_slice_buffer initial_string_buffer;
grpc_endpoint *tcp;
- grpc_mdctx *mdctx;
-
grpc_closure connected;
} connector;
@@ -72,18 +74,33 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
- grpc_mdctx_unref(c->mdctx);
+ /* c->initial_string_buffer does not need to be destroyed */
gpr_free(c);
}
}
+static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ connector_unref(exec_ctx, arg);
+}
+
static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
if (tcp != NULL) {
- c->result->transport = grpc_create_chttp2_transport(
- exec_ctx, c->args.channel_args, tcp, c->mdctx, 1);
+ if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
+ grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
+ c);
+ gpr_slice_buffer_init(&c->initial_string_buffer);
+ gpr_slice_buffer_add(&c->initial_string_buffer,
+ c->args.initial_connect_string);
+ connector_ref(arg);
+ grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
+ &c->initial_string_sent);
+ }
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
GPR_ASSERT(c->result->transport);
@@ -123,7 +140,6 @@ static const grpc_connector_vtable connector_vtable = {
typedef struct {
grpc_subchannel_factory base;
gpr_refcount refs;
- grpc_mdctx *mdctx;
grpc_channel_args *merge_args;
grpc_channel *master;
} subchannel_factory;
@@ -139,7 +155,6 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&f->refs)) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
grpc_channel_args_destroy(f->merge_args);
- grpc_mdctx_unref(f->mdctx);
gpr_free(f);
}
}
@@ -154,10 +169,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
- c->mdctx = f->mdctx;
- grpc_mdctx_ref(c->mdctx);
gpr_ref_init(&c->refs, 1);
- args->mdctx = f->mdctx;
args->args = final_args;
args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
@@ -182,7 +194,6 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
subchannel_factory *f;
- grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 0;
GRPC_API_TRACE(
@@ -196,14 +207,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n,
- args, mdctx, 1);
+ channel =
+ grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
- grpc_mdctx_ref(mdctx);
- f->mdctx = mdctx;
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index f8cba01cad..04d68620f1 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -93,6 +93,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
+ grpc_mdctx_global_init();
grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create());
grpc_register_lb_policy(grpc_pick_first_lb_factory_create());
grpc_register_lb_policy(grpc_round_robin_lb_factory_create());
@@ -147,6 +148,7 @@ void grpc_shutdown(void) {
g_all_of_the_plugins[i].destroy();
}
}
+ grpc_mdctx_global_shutdown();
}
gpr_mu_unlock(&g_init_mu);
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index fc458a603d..4a55544ac1 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,7 +49,6 @@ typedef struct {
} call_data;
typedef struct {
- grpc_mdctx *mdctx;
grpc_channel *master;
grpc_status_code error_code;
const char *error_message;
@@ -60,9 +59,9 @@ static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) {
channel_data *chand = elem->channel_data;
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
- calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp);
- calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message",
- chand->error_message);
+ calld->status.md = grpc_mdelem_from_strings("grpc-status", tmp);
+ calld->details.md =
+ grpc_mdelem_from_strings("grpc-message", chand->error_message);
calld->status.prev = calld->details.next = NULL;
calld->status.next = &calld->details;
calld->details.prev = &calld->status;
@@ -115,7 +114,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
- chand->mdctx = args->metadata_context;
chand->master = args->master;
}
@@ -139,8 +137,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
channel_data *chand;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
static const grpc_channel_filter *filters[] = {&lame_filter};
- channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, 1,
- NULL, grpc_mdctx_create(), 1);
+ channel =
+ grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, NULL, 1);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GRPC_API_TRACE(
"grpc_lame_client_channel_create(target=%s, error_code=%d, "
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index e07f04e8fe..c9a54d9237 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -37,6 +37,8 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
#include "src/core/census/grpc_filter.h"
#include "src/core/channel/channel_args.h"
@@ -61,14 +63,14 @@ typedef struct {
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
+ grpc_closure initial_string_sent;
+ gpr_slice_buffer initial_string_buffer;
gpr_mu mu;
grpc_endpoint *connecting_endpoint;
grpc_endpoint *newly_connecting_endpoint;
grpc_closure connected_closure;
-
- grpc_mdctx *mdctx;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -79,7 +81,7 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
- grpc_mdctx_unref(c->mdctx);
+ /* c->initial_string_buffer does not need to be destroyed */
gpr_free(c);
}
}
@@ -102,7 +104,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
- exec_ctx, c->args.channel_args, secure_endpoint, c->mdctx, 1);
+ exec_ctx, c->args.channel_args, secure_endpoint, 1);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
@@ -115,6 +117,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
notify->cb(exec_ctx, notify->cb_arg, 1);
}
+static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ connector *c = arg;
+ grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
+ c->connecting_endpoint,
+ on_secure_handshake_done, c);
+}
+
static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
connector *c = arg;
grpc_closure *notify;
@@ -124,8 +134,19 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
GPR_ASSERT(c->connecting_endpoint == NULL);
c->connecting_endpoint = tcp;
gpr_mu_unlock(&c->mu);
- grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
- tcp, on_secure_handshake_done, c);
+ if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
+ grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
+ c);
+ gpr_slice_buffer_init(&c->initial_string_buffer);
+ gpr_slice_buffer_add(&c->initial_string_buffer,
+ c->args.initial_connect_string);
+ grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
+ &c->initial_string_sent);
+ } else {
+ grpc_security_connector_do_handshake(exec_ctx,
+ &c->security_connector->base, tcp,
+ on_secure_handshake_done, c);
+ }
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
@@ -171,7 +192,6 @@ static const grpc_connector_vtable connector_vtable = {
typedef struct {
grpc_subchannel_factory base;
gpr_refcount refs;
- grpc_mdctx *mdctx;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
grpc_channel *master;
@@ -190,7 +210,6 @@ static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
"subchannel_factory");
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
grpc_channel_args_destroy(f->merge_args);
- grpc_mdctx_unref(f->mdctx);
gpr_free(f);
}
}
@@ -206,13 +225,10 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
- c->mdctx = f->mdctx;
gpr_mu_init(&c->mu);
- grpc_mdctx_ref(c->mdctx);
gpr_ref_init(&c->refs, 1);
args->args = final_args;
args->master = f->master;
- args->mdctx = f->mdctx;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
@@ -236,7 +252,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
- grpc_mdctx *mdctx;
grpc_resolver *resolver;
subchannel_factory *f;
#define MAX_FILTERS 3
@@ -266,7 +281,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
target, GRPC_STATUS_INVALID_ARGUMENT,
"Failed to create security connector.");
}
- mdctx = grpc_mdctx_create();
connector_arg = grpc_security_connector_to_arg(&security_connector->base);
args_copy = grpc_channel_args_copy_and_add(
@@ -280,13 +294,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n,
- args_copy, mdctx, 1);
+ args_copy, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
- grpc_mdctx_ref(mdctx);
- f->mdctx = mdctx;
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
f->security_connector = security_connector;
f->merge_args = grpc_channel_args_copy(args_copy);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e9f380083f..cdbd542d9a 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -54,6 +54,7 @@
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/init.h"
#include "src/core/transport/metadata.h"
+#include "src/core/transport/static_metadata.h"
typedef struct listener {
void *arg;
@@ -108,8 +109,6 @@ struct channel_data {
grpc_server *server;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
- grpc_mdstr *path_key;
- grpc_mdstr *authority_key;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
@@ -558,12 +557,11 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (md->key == chand->path_key) {
+ if (md->key == GRPC_MDSTR_PATH) {
calld->path = GRPC_MDSTR_REF(md->value);
return NULL;
- } else if (md->key == chand->authority_key) {
+ } else if (md->key == GRPC_MDSTR_AUTHORITY) {
calld->host = GRPC_MDSTR_REF(md->value);
return NULL;
}
@@ -718,9 +716,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!args->is_last);
chand->server = NULL;
chand->channel = NULL;
- chand->path_key = grpc_mdstr_from_string(args->metadata_context, ":path");
- chand->authority_key =
- grpc_mdstr_from_string(args->metadata_context, ":authority");
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
@@ -750,8 +745,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->next = chand->prev = chand;
maybe_finish_shutdown(exec_ctx, chand->server);
gpr_mu_unlock(&chand->server->mu_global);
- GRPC_MDSTR_UNREF(chand->path_key);
- GRPC_MDSTR_UNREF(chand->authority_key);
server_unref(exec_ctx, chand->server);
}
}
@@ -894,7 +887,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
grpc_channel_filter const **extra_filters,
- size_t num_extra_filters, grpc_mdctx *mdctx,
+ size_t num_extra_filters,
const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
@@ -929,7 +922,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
}
channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
- num_filters, args, mdctx, 0);
+ num_filters, args, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)->channel_data;
chand->server = s;
@@ -948,8 +941,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->registered_methods = gpr_malloc(alloc);
memset(chand->registered_methods, 0, alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
- host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
- method = grpc_mdstr_from_string(mdctx, rm->method);
+ host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL;
+ method = grpc_mdstr_from_string(rm->method);
hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
for (probes = 0; chand->registered_methods[(hash + probes) % slots]
.server_registered_method != NULL;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 4c46d07679..a957fdb360 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -57,7 +57,7 @@ void grpc_server_add_listener(
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_transport *transport,
grpc_channel_filter const **extra_filters,
- size_t num_extra_filters, grpc_mdctx *mdctx,
+ size_t num_extra_filters,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 580b91573c..990bc4aa23 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -44,11 +44,11 @@
#include <grpc/support/useful.h>
static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_transport *transport, grpc_mdctx *mdctx) {
+ grpc_transport *transport) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(exec_ctx, server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx,
+ GPR_ARRAY_SIZE(extra_filters),
grpc_server_get_channel_args(server));
}
@@ -61,10 +61,9 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
* (as in server_secure_chttp2.c) needs to add synchronization to avoid this
* case.
*/
- grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_transport *transport = grpc_create_chttp2_transport(
- exec_ctx, grpc_server_get_channel_args(server), tcp, mdctx, 0);
- setup_transport(exec_ctx, server, transport, mdctx);
+ exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
+ setup_transport(exec_ctx, server, transport);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
}
@@ -107,9 +106,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
}
for (i = 0; i < resolved->naddrs; i++) {
- port_temp = grpc_tcp_server_add_port(
+ grpc_tcp_listener *listener;
+ listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
+ port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) {
if (port_num == -1) {
port_num = port_temp;