diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-06 13:28:22 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-06 13:28:22 -0700 |
commit | 3749a6d417f9092f0f39a174149087fd256cc82b (patch) | |
tree | 0f955cfc42eb8feab05112e037943065bc6a5edf /src/core | |
parent | 0181dd0bfb1e503c4d52b0deff71c15e3f9e1b3a (diff) | |
parent | 04035de631a6a46c03ebf3e8db2102d5688dc71a (diff) |
Merge github.com:grpc/grpc into stats_histo
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c | 36 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/writing.c | 11 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 75 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 10 | ||||
-rw-r--r-- | src/core/lib/surface/call_log_batch.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/metadata_batch.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/metadata_batch.h | 1 | ||||
-rw-r--r-- | src/core/lib/transport/static_metadata.c | 25 | ||||
-rw-r--r-- | src/core/lib/transport/static_metadata.h | 2 | ||||
-rw-r--r-- | src/core/tsi/test_creds/BUILD | 14 |
10 files changed, 130 insertions, 48 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 56ed4371a9..3ff081a514 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -32,6 +32,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" @@ -125,7 +126,6 @@ static const grpc_resolver_vtable fake_resolver_vtable = { struct grpc_fake_resolver_response_generator { fake_resolver* resolver; // Set by the fake_resolver constructor to itself. - grpc_channel_args* next_response; gpr_refcount refcount; }; @@ -151,19 +151,26 @@ void grpc_fake_resolver_response_generator_unref( } } -static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_fake_resolver_response_generator* generator = - (grpc_fake_resolver_response_generator*)arg; +typedef struct set_response_closure_arg { + grpc_closure set_response_closure; + grpc_fake_resolver_response_generator* generator; + grpc_channel_args* next_response; +} set_response_closure_arg; + +static void set_response_closure_fn(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + set_response_closure_arg* closure_arg = arg; + grpc_fake_resolver_response_generator* generator = closure_arg->generator; fake_resolver* r = generator->resolver; if (r->next_results != NULL) { grpc_channel_args_destroy(exec_ctx, r->next_results); } - r->next_results = generator->next_response; + r->next_results = closure_arg->next_response; if (r->results_upon_error != NULL) { grpc_channel_args_destroy(exec_ctx, r->results_upon_error); } - r->results_upon_error = grpc_channel_args_copy(generator->next_response); + r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response); + gpr_free(closure_arg); fake_resolver_maybe_finish_next_locked(exec_ctx, r); } @@ -171,12 +178,15 @@ void grpc_fake_resolver_response_generator_set_response( grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, grpc_channel_args* next_response) { GPR_ASSERT(generator->resolver != NULL); - generator->next_response = grpc_channel_args_copy(next_response); - GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(set_response_cb, generator, - grpc_combiner_scheduler( - generator->resolver->base.combiner)), - GRPC_ERROR_NONE); + set_response_closure_arg* closure_arg = gpr_zalloc(sizeof(*closure_arg)); + closure_arg->generator = generator; + closure_arg->next_response = grpc_channel_args_copy(next_response); + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, + set_response_closure_fn, closure_arg, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), + GRPC_ERROR_NONE); } static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index c16ffaa5ef..5e9d97d485 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -156,17 +156,8 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) { } // Returns true if initial_metadata contains only default headers. -// -// TODO(roth): The fact that we hard-code these particular headers here -// is fairly ugly. Need some better way to know which headers are -// default, maybe via a bit in the static metadata table? static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) { - int num_default_fields = - (initial_metadata->idx.named.status != NULL) + - (initial_metadata->idx.named.content_type != NULL) + - (initial_metadata->idx.named.grpc_encoding != NULL) + - (initial_metadata->idx.named.grpc_accept_encoding != NULL); - return (size_t)num_default_fields == initial_metadata->list.count; + return initial_metadata->list.default_count == initial_metadata->list.count; } grpc_chttp2_begin_write_result grpc_chttp2_begin_write( diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index abb558982b..765c13c65e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -187,9 +187,34 @@ struct stream_obj { /* Mutex to protect storage */ gpr_mu mu; + + /* Refcount object of the stream */ + grpc_stream_refcount *refcount; }; typedef struct stream_obj stream_obj; +#ifndef NDEBUG +#define GRPC_CRONET_STREAM_REF(stream, reason) \ + grpc_cronet_stream_ref((stream), (reason)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream), (reason)) +void grpc_cronet_stream_ref(stream_obj *s, const char *reason) { + grpc_stream_ref(s->refcount, reason); +} +void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s, + const char *reason) { + grpc_stream_unref(exec_ctx, s->refcount, reason); +} +#else +#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream)) +void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); } +void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) { + grpc_stream_unref(exec_ctx, s->refcount); +} +#endif + static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas); @@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s, This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ -static void execute_from_storage(stream_obj *s) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; +static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { gpr_mu_lock(&s->mu); for (struct op_and_state *curr = s->storage.head; curr != NULL;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); - enum e_op_result result = execute_stream_op(&exec_ctx, curr); + enum e_op_result result = execute_stream_op(exec_ctx, curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); /* if this op is done, then remove it and free memory */ @@ -369,7 +393,6 @@ static void execute_from_storage(stream_obj *s) { } } gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); } /* @@ -377,6 +400,8 @@ static void execute_from_storage(stream_obj *s) { */ static void on_failed(bidirectional_stream *stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -392,7 +417,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -400,6 +427,8 @@ static void on_failed(bidirectional_stream *stream, int net_error) { */ static void on_canceled(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -415,7 +444,9 @@ static void on_canceled(bidirectional_stream *stream) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -423,6 +454,8 @@ static void on_canceled(bidirectional_stream *stream) { */ static void on_succeeded(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -430,7 +463,9 @@ static void on_succeeded(bidirectional_stream *stream) { s->cbs = NULL; null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -438,6 +473,7 @@ static void on_succeeded(bidirectional_stream *stream) { */ static void on_stream_ready(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); @@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) { } } gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -513,14 +550,15 @@ static void on_response_headers_received( s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); + execute_from_storage(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); - execute_from_storage(s); } /* Cronet callback */ static void on_write_completed(bidirectional_stream *stream, const char *data) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -530,7 +568,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { } s->state.state_callback_received[OP_SEND_MESSAGE] = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -538,6 +577,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { */ static void on_read_completed(bidirectional_stream *stream, char *data, int count) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); @@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data, gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } } else { null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -625,12 +666,11 @@ static void on_response_trailers_received( s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); } else { gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; + + s->refcount = refcount; + GRPC_CRONET_STREAM_REF(s, "cronet transport"); memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; memset(&s->state, 0, sizeof(s->state)); @@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } stream_obj *s = (stream_obj *)gs; add_to_storage(s, op); - execute_from_storage(s); + execute_from_storage(exec_ctx, s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 185bfccb77..d537637cbb 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -19,6 +19,10 @@ #ifndef GRPC_CORE_LIB_SURFACE_CALL_H #define GRPC_CORE_LIB_SURFACE_CALL_H +#ifdef __cplusplus +extern "C" { +#endif + #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/surface/api_trace.h" @@ -26,10 +30,6 @@ #include <grpc/grpc.h> #include <grpc/impl/codegen/compression_types.h> -#ifdef __cplusplus -extern "C" { -#endif - typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, void *user_data); @@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, +void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity, grpc_call *call, const grpc_op *ops, size_t nops, void *tag); diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c index 4443aba58a..4a1c265817 100644 --- a/src/core/lib/surface/call_log_batch.c +++ b/src/core/lib/surface/call_log_batch.c @@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) { return out; } -void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, +void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity, grpc_call *call, const grpc_op *ops, size_t nops, void *tag) { char *tmp; diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index 8f24b8527c..a077052561 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -105,6 +105,7 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch, return GRPC_ERROR_NONE; } if (batch->idx.array[idx] == NULL) { + if (grpc_static_callout_is_default[idx]) ++batch->list.default_count; batch->idx.array[idx] = storage; return GRPC_ERROR_NONE; } @@ -120,6 +121,7 @@ static void maybe_unlink_callout(grpc_metadata_batch *batch, if (idx == GRPC_BATCH_CALLOUTS_COUNT) { return; } + if (grpc_static_callout_is_default[idx]) --batch->list.default_count; GPR_ASSERT(batch->idx.array[idx] != NULL); batch->idx.array[idx] = NULL; } diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 1b11a3e252..57d298c75c 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -41,6 +41,7 @@ typedef struct grpc_linked_mdelem { typedef struct grpc_mdelem_list { size_t count; + size_t default_count; // Number of default keys. grpc_linked_mdelem *head; grpc_linked_mdelem *tail; } grpc_mdelem_list; diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index 28f05d5c44..b20d94aeac 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -823,6 +823,31 @@ grpc_mdelem_data grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT] = { {.refcount = &grpc_static_metadata_refcounts[97], .data.refcounted = {g_bytes + 1040, 13}}}, }; +bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT] = { + true, // :path + true, // :method + true, // :status + true, // :authority + true, // :scheme + true, // te + true, // grpc-message + true, // grpc-status + true, // grpc-payload-bin + true, // grpc-encoding + true, // grpc-accept-encoding + true, // grpc-server-stats-bin + true, // grpc-tags-bin + true, // grpc-trace-bin + true, // content-type + true, // content-encoding + true, // accept-encoding + true, // grpc-internal-encoding-request + true, // grpc-internal-stream-encoding-request + true, // user-agent + true, // host + true, // lb-token +}; + const uint8_t grpc_static_accept_encoding_metadata[8] = {0, 76, 77, 78, 79, 80, 81, 82}; diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h index 93ab90dff8..f03a9d23b1 100644 --- a/src/core/lib/transport/static_metadata.h +++ b/src/core/lib/transport/static_metadata.h @@ -571,6 +571,8 @@ typedef union { GRPC_BATCH_CALLOUTS_COUNT) \ : GRPC_BATCH_CALLOUTS_COUNT) +extern bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT]; + extern const uint8_t grpc_static_accept_encoding_metadata[8]; #define GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(algs) \ (GRPC_MAKE_MDELEM( \ diff --git a/src/core/tsi/test_creds/BUILD b/src/core/tsi/test_creds/BUILD index 4b0786d7b8..732f6d91b2 100644 --- a/src/core/tsi/test_creds/BUILD +++ b/src/core/tsi/test_creds/BUILD @@ -15,7 +15,15 @@ licenses(["notice"]) # Apache v2 exports_files([ - "ca.pem", - "server1.key", - "server1.pem", + "ca.pem", + "server1.key", + "server1.pem", + "server0.key", + "server0.pem", + "client.key", + "client.pem", + "badserver.key", + "badserver.pem", + "badclient.key", + "badclient.pem", ]) |