aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c36
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c11
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c75
-rw-r--r--src/core/lib/surface/call.h10
-rw-r--r--src/core/lib/surface/call_log_batch.c2
-rw-r--r--src/core/lib/transport/metadata_batch.c2
-rw-r--r--src/core/lib/transport/metadata_batch.h1
-rw-r--r--src/core/lib/transport/static_metadata.c25
-rw-r--r--src/core/lib/transport/static_metadata.h2
-rw-r--r--src/core/tsi/test_creds/BUILD14
10 files changed, 130 insertions, 48 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 56ed4371a9..3ff081a514 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -32,6 +32,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
@@ -125,7 +126,6 @@ static const grpc_resolver_vtable fake_resolver_vtable = {
struct grpc_fake_resolver_response_generator {
fake_resolver* resolver; // Set by the fake_resolver constructor to itself.
- grpc_channel_args* next_response;
gpr_refcount refcount;
};
@@ -151,19 +151,26 @@ void grpc_fake_resolver_response_generator_unref(
}
}
-static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_fake_resolver_response_generator* generator =
- (grpc_fake_resolver_response_generator*)arg;
+typedef struct set_response_closure_arg {
+ grpc_closure set_response_closure;
+ grpc_fake_resolver_response_generator* generator;
+ grpc_channel_args* next_response;
+} set_response_closure_arg;
+
+static void set_response_closure_fn(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ set_response_closure_arg* closure_arg = arg;
+ grpc_fake_resolver_response_generator* generator = closure_arg->generator;
fake_resolver* r = generator->resolver;
if (r->next_results != NULL) {
grpc_channel_args_destroy(exec_ctx, r->next_results);
}
- r->next_results = generator->next_response;
+ r->next_results = closure_arg->next_response;
if (r->results_upon_error != NULL) {
grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
}
- r->results_upon_error = grpc_channel_args_copy(generator->next_response);
+ r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response);
+ gpr_free(closure_arg);
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
@@ -171,12 +178,15 @@ void grpc_fake_resolver_response_generator_set_response(
grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
grpc_channel_args* next_response) {
GPR_ASSERT(generator->resolver != NULL);
- generator->next_response = grpc_channel_args_copy(next_response);
- GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_CREATE(set_response_cb, generator,
- grpc_combiner_scheduler(
- generator->resolver->base.combiner)),
- GRPC_ERROR_NONE);
+ set_response_closure_arg* closure_arg = gpr_zalloc(sizeof(*closure_arg));
+ closure_arg->generator = generator;
+ closure_arg->next_response = grpc_channel_args_copy(next_response);
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
+ set_response_closure_fn, closure_arg,
+ grpc_combiner_scheduler(
+ generator->resolver->base.combiner)),
+ GRPC_ERROR_NONE);
}
static void* response_generator_arg_copy(void* p) {
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index c16ffaa5ef..5e9d97d485 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -156,17 +156,8 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) {
}
// Returns true if initial_metadata contains only default headers.
-//
-// TODO(roth): The fact that we hard-code these particular headers here
-// is fairly ugly. Need some better way to know which headers are
-// default, maybe via a bit in the static metadata table?
static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) {
- int num_default_fields =
- (initial_metadata->idx.named.status != NULL) +
- (initial_metadata->idx.named.content_type != NULL) +
- (initial_metadata->idx.named.grpc_encoding != NULL) +
- (initial_metadata->idx.named.grpc_accept_encoding != NULL);
- return (size_t)num_default_fields == initial_metadata->list.count;
+ return initial_metadata->list.default_count == initial_metadata->list.count;
}
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index abb558982b..765c13c65e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -187,9 +187,34 @@ struct stream_obj {
/* Mutex to protect storage */
gpr_mu mu;
+
+ /* Refcount object of the stream */
+ grpc_stream_refcount *refcount;
};
typedef struct stream_obj stream_obj;
+#ifndef NDEBUG
+#define GRPC_CRONET_STREAM_REF(stream, reason) \
+ grpc_cronet_stream_ref((stream), (reason))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
+void grpc_cronet_stream_ref(stream_obj *s, const char *reason) {
+ grpc_stream_ref(s->refcount, reason);
+}
+void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s,
+ const char *reason) {
+ grpc_stream_unref(exec_ctx, s->refcount, reason);
+}
+#else
+#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream))
+void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); }
+void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) {
+ grpc_stream_unref(exec_ctx, s->refcount);
+}
+#endif
+
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);
@@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
-static void execute_from_storage(stream_obj *s) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
- enum e_op_result result = execute_stream_op(&exec_ctx, curr);
+ enum e_op_result result = execute_stream_op(exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
@@ -369,7 +393,6 @@ static void execute_from_storage(stream_obj *s) {
}
}
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -377,6 +400,8 @@ static void execute_from_storage(stream_obj *s) {
*/
static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -392,7 +417,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -400,6 +427,8 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
*/
static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -415,7 +444,9 @@ static void on_canceled(bidirectional_stream *stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -423,6 +454,8 @@ static void on_canceled(bidirectional_stream *stream) {
*/
static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -430,7 +463,9 @@ static void on_succeeded(bidirectional_stream *stream) {
s->cbs = NULL;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -438,6 +473,7 @@ static void on_succeeded(bidirectional_stream *stream) {
*/
static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu);
@@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) {
}
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -513,14 +550,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
+ execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
- execute_from_storage(s);
}
/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream *stream, const char *data) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -530,7 +568,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -538,6 +577,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
*/
static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
@@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -625,12 +666,11 @@ static void on_response_trailers_received(
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
} else {
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
+
+ s->refcount = refcount;
+ GRPC_CRONET_STREAM_REF(s, "cronet transport");
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
memset(&s->state, 0, sizeof(s->state));
@@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
- execute_from_storage(s);
+ execute_from_storage(exec_ctx, s);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 185bfccb77..d537637cbb 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -19,6 +19,10 @@
#ifndef GRPC_CORE_LIB_SURFACE_CALL_H
#define GRPC_CORE_LIB_SURFACE_CALL_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/surface/api_trace.h"
@@ -26,10 +30,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success,
void *user_data);
@@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag);
diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c
index 4443aba58a..4a1c265817 100644
--- a/src/core/lib/surface/call_log_batch.c
+++ b/src/core/lib/surface/call_log_batch.c
@@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) {
return out;
}
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag) {
char *tmp;
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 8f24b8527c..a077052561 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -105,6 +105,7 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch,
return GRPC_ERROR_NONE;
}
if (batch->idx.array[idx] == NULL) {
+ if (grpc_static_callout_is_default[idx]) ++batch->list.default_count;
batch->idx.array[idx] = storage;
return GRPC_ERROR_NONE;
}
@@ -120,6 +121,7 @@ static void maybe_unlink_callout(grpc_metadata_batch *batch,
if (idx == GRPC_BATCH_CALLOUTS_COUNT) {
return;
}
+ if (grpc_static_callout_is_default[idx]) --batch->list.default_count;
GPR_ASSERT(batch->idx.array[idx] != NULL);
batch->idx.array[idx] = NULL;
}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 1b11a3e252..57d298c75c 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -41,6 +41,7 @@ typedef struct grpc_linked_mdelem {
typedef struct grpc_mdelem_list {
size_t count;
+ size_t default_count; // Number of default keys.
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index 28f05d5c44..b20d94aeac 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -823,6 +823,31 @@ grpc_mdelem_data grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT] = {
{.refcount = &grpc_static_metadata_refcounts[97],
.data.refcounted = {g_bytes + 1040, 13}}},
};
+bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT] = {
+ true, // :path
+ true, // :method
+ true, // :status
+ true, // :authority
+ true, // :scheme
+ true, // te
+ true, // grpc-message
+ true, // grpc-status
+ true, // grpc-payload-bin
+ true, // grpc-encoding
+ true, // grpc-accept-encoding
+ true, // grpc-server-stats-bin
+ true, // grpc-tags-bin
+ true, // grpc-trace-bin
+ true, // content-type
+ true, // content-encoding
+ true, // accept-encoding
+ true, // grpc-internal-encoding-request
+ true, // grpc-internal-stream-encoding-request
+ true, // user-agent
+ true, // host
+ true, // lb-token
+};
+
const uint8_t grpc_static_accept_encoding_metadata[8] = {0, 76, 77, 78,
79, 80, 81, 82};
diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h
index 93ab90dff8..f03a9d23b1 100644
--- a/src/core/lib/transport/static_metadata.h
+++ b/src/core/lib/transport/static_metadata.h
@@ -571,6 +571,8 @@ typedef union {
GRPC_BATCH_CALLOUTS_COUNT) \
: GRPC_BATCH_CALLOUTS_COUNT)
+extern bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT];
+
extern const uint8_t grpc_static_accept_encoding_metadata[8];
#define GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(algs) \
(GRPC_MAKE_MDELEM( \
diff --git a/src/core/tsi/test_creds/BUILD b/src/core/tsi/test_creds/BUILD
index 4b0786d7b8..732f6d91b2 100644
--- a/src/core/tsi/test_creds/BUILD
+++ b/src/core/tsi/test_creds/BUILD
@@ -15,7 +15,15 @@
licenses(["notice"]) # Apache v2
exports_files([
- "ca.pem",
- "server1.key",
- "server1.pem",
+ "ca.pem",
+ "server1.key",
+ "server1.pem",
+ "server0.key",
+ "server0.pem",
+ "client.key",
+ "client.pem",
+ "badserver.key",
+ "badserver.pem",
+ "badclient.key",
+ "badclient.pem",
])