diff options
Diffstat (limited to 'src')
77 files changed, 1121 insertions, 337 deletions
diff --git a/src/core/ext/README.md b/src/core/ext/README.md new file mode 100644 index 0000000000..0812b20823 --- /dev/null +++ b/src/core/ext/README.md @@ -0,0 +1,5 @@ +Optional plugins for gRPC Core: Modules in this directory extend gRPC Core in +useful ways. All optional code belongs here. + +NOTE: The movement of code between lib and ext is an ongoing effort, so this +directory currently contains too much of the core library. diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c index 8f0e12296d..3b5d6dab2b 100644 --- a/src/core/ext/census/tracing.c +++ b/src/core/ext/census/tracing.c @@ -31,19 +31,15 @@ * */ -//#include "src/core/ext/census/tracing.h" - #include <grpc/census.h> /* TODO(aveitch): These are all placeholder implementations. */ -// int census_trace_mask(const census_context *context) { -// return CENSUS_TRACE_MASK_NONE; -// } - -// void census_set_trace_mask(int trace_mask) {} +int census_trace_mask(const census_context *context) { + return CENSUS_TRACE_MASK_NONE; +} -// void census_trace_print(census_context *context, uint32_t type, -// const char *buffer, size_t n) {} +void census_set_trace_mask(int trace_mask) {} -// void SetTracerParams(const Params& params); +void census_trace_print(census_context *context, uint32_t type, + const char *buffer, size_t n) {} diff --git a/src/core/ext/client_channel/channel_connectivity.c b/src/core/ext/client_channel/channel_connectivity.c index b10f444b63..dd70bc2c6c 100644 --- a/src/core/ext/client_channel/channel_connectivity.c +++ b/src/core/ext/client_channel/channel_connectivity.c @@ -76,6 +76,7 @@ typedef struct { gpr_mu mu; callback_phase phase; grpc_closure on_complete; + grpc_closure on_timeout; grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; @@ -200,6 +201,8 @@ void grpc_channel_watch_connectivity_state( gpr_mu_init(&w->mu); grpc_closure_init(&w->on_complete, watch_complete, w, grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_timeout, timeout_complete, w, + grpc_schedule_on_exec_ctx); w->phase = WAITING; w->state = last_observed_state; w->cq = cq; @@ -208,7 +211,7 @@ void grpc_channel_watch_connectivity_state( grpc_timer_init(&exec_ctx, &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 988b7a1d5c..d50bba60f6 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/client_channel/subchannel_index.h" @@ -84,6 +85,7 @@ void grpc_client_channel_init(void) { set_default_host_if_unset, NULL); grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter, (void *)&grpc_client_channel_filter); + grpc_http_connect_register_handshaker_factory(); } void grpc_client_channel_shutdown(void) { diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 27b117af84..fba32561ac 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -44,6 +44,7 @@ #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/slice/slice_internal.h" @@ -54,6 +55,8 @@ typedef struct http_connect_handshaker { grpc_handshaker base; char* proxy_server; + grpc_http_header* headers; + size_t num_headers; gpr_refcount refcount; gpr_mu mu; @@ -89,6 +92,11 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, gpr_free(handshaker->read_buffer_to_destroy); } gpr_free(handshaker->proxy_server); + for (size_t i = 0; i < handshaker->num_headers; ++i) { + gpr_free(handshaker->headers[i].key); + gpr_free(handshaker->headers[i].value); + } + gpr_free(handshaker->headers); grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer); grpc_http_parser_destroy(&handshaker->http_parser); grpc_http_response_destroy(&handshaker->http_response); @@ -289,6 +297,8 @@ static void http_connect_handshaker_do_handshake( request.host = server_name; request.http.method = "CONNECT"; request.http.path = server_name; + request.http.hdrs = handshaker->headers; + request.http.hdr_count = handshaker->num_headers; request.handshaker = &grpc_httpcli_plaintext; grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); @@ -306,7 +316,9 @@ static const grpc_handshaker_vtable http_connect_handshaker_vtable = { http_connect_handshaker_destroy, http_connect_handshaker_shutdown, http_connect_handshaker_do_handshake}; -grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + grpc_http_header* headers, + size_t num_headers) { GPR_ASSERT(proxy_server != NULL); http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker)); @@ -314,6 +326,14 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { gpr_mu_init(&handshaker->mu); gpr_ref_init(&handshaker->refcount, 1); handshaker->proxy_server = gpr_strdup(proxy_server); + if (num_headers > 0) { + handshaker->headers = gpr_malloc(sizeof(grpc_http_header) * num_headers); + for (size_t i = 0; i < num_headers; ++i) { + handshaker->headers[i].key = gpr_strdup(headers[i].key); + handshaker->headers[i].value = gpr_strdup(headers[i].value); + } + handshaker->num_headers = num_headers; + } grpc_slice_buffer_init(&handshaker->write_buffer); grpc_closure_init(&handshaker->request_done_closure, on_write_done, handshaker, grpc_schedule_on_exec_ctx); @@ -347,3 +367,33 @@ done: grpc_uri_destroy(uri); return proxy_name; } + +// +// handshaker factory +// + +static void handshaker_factory_add_handshakers( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { + char* proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, NULL, 0)); + gpr_free(proxy_name); + } +} + +static void handshaker_factory_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker_factory* factory) {} + +static const grpc_handshaker_factory_vtable handshaker_factory_vtable = { + handshaker_factory_add_handshakers, handshaker_factory_destroy}; + +static grpc_handshaker_factory handshaker_factory = { + &handshaker_factory_vtable}; + +void grpc_http_connect_register_handshaker_factory() { + grpc_handshaker_factory_register(true /* at_start */, HANDSHAKER_CLIENT, + &handshaker_factory); +} diff --git a/src/core/ext/client_channel/http_connect_handshaker.h b/src/core/ext/client_channel/http_connect_handshaker.h index ea293852e6..c2e68de716 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.h +++ b/src/core/ext/client_channel/http_connect_handshaker.h @@ -35,12 +35,18 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/http/parser.h" -/// Does NOT take ownership of \a proxy_server. -grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server); +/// Creates a new HTTP CONNECT handshaker. +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + grpc_http_header* headers, + size_t num_headers); /// Returns the name of the proxy to use, or NULL if no proxy is configured. /// Caller takes ownership of result. char* grpc_get_http_proxy_server(); +/// Registers handshaker factory. +void grpc_http_connect_register_handshaker_factory(); + #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index fad5c69c83..1bac82b451 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -109,6 +109,9 @@ struct grpc_subchannel { /** callback for connection finishing */ grpc_closure connected; + /** callback for our alarm */ + grpc_closure on_alarm; + /** pollset_set tracking who's interested in a connection being setup */ grpc_pollset_set *pollset_set; @@ -483,7 +486,8 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", time_til_next.tv_sec, time_til_next.tv_nsec); } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now); } } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 2d48a3a9e7..97f98df03a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -327,6 +327,9 @@ typedef struct glb_lb_policy { /* A response from the LB server has been received. Process it */ grpc_closure lb_on_response_received; + /* LB call retry timer callback. */ + grpc_closure lb_on_call_retry; + grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -1364,8 +1367,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer, + glb_policy, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, - lb_call_on_retry_timer, glb_policy, now); + &glb_policy->lb_on_call_retry, now); } gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, diff --git a/src/core/ext/resolver/README.md b/src/core/ext/resolver/README.md new file mode 100644 index 0000000000..b0e234e96a --- /dev/null +++ b/src/core/ext/resolver/README.md @@ -0,0 +1,4 @@ +# Resolver + +Implementations of various name resolution schemes. +See the [naming spec](/doc/naming.md). diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 124b16bbc3..655d9dc586 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -81,6 +81,7 @@ typedef struct { /** retry timer */ bool have_retry_timer; grpc_timer retry_timer; + grpc_closure on_retry; /** retry backoff state */ gpr_backoff backoff_state; @@ -188,7 +189,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); gpr_timespec timeout = gpr_time_sub(next_try, now); const char *msg = grpc_error_string(error); - gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); + gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", msg); grpc_error_free_string(msg); GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; @@ -199,8 +200,9 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r, - now); + grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); diff --git a/src/core/ext/transport/README.md b/src/core/ext/transport/README.md new file mode 100644 index 0000000000..2290568784 --- /dev/null +++ b/src/core/ext/transport/README.md @@ -0,0 +1 @@ +Transports for gRPC diff --git a/src/core/ext/transport/chttp2/README.md b/src/core/ext/transport/chttp2/README.md new file mode 100644 index 0000000000..8880a47460 --- /dev/null +++ b/src/core/ext/transport/chttp2/README.md @@ -0,0 +1 @@ +CHTTP2 - gRPC's implementation of a HTTP2 based transport diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 2385f91dbd..2c5dfaea60 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -46,6 +46,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,9 +59,6 @@ typedef struct { bool shutdown; bool connecting; - grpc_chttp2_add_handshakers_func add_handshakers; - void *add_handshakers_user_data; - grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; @@ -151,16 +149,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void start_handshake_locked(grpc_exec_ctx *exec_ctx, chttp2_connector *c) { c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add(c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name)); - gpr_free(proxy_name); - } - if (c->add_handshakers != NULL) { - c->add_handshakers(exec_ctx, c->add_handshakers_user_data, + grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args, c->handshake_mgr); - } grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); @@ -250,15 +240,11 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, chttp2_connector_connect}; -grpc_connector *grpc_chttp2_connector_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers, - void *add_handshakers_user_data) { +grpc_connector *grpc_chttp2_connector_create() { chttp2_connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); - c->add_handshakers = add_handshakers; - c->add_handshakers_user_data = add_handshakers_user_data; return &c->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 58eba22417..f5d1025432 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -35,17 +35,7 @@ #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H #include "src/core/ext/client_channel/connector.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/iomgr/exec_ctx.h" -typedef void (*grpc_chttp2_add_handshakers_func)( - grpc_exec_ctx* exec_ctx, void* user_data, - grpc_handshake_manager* handshake_mgr); - -/// If \a add_handshakers is non-NULL, it will be called with -/// \a add_handshakers_user_data to add handshakers. -grpc_connector* grpc_chttp2_connector_create( - grpc_exec_ctx* exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers, - void* add_handshakers_user_data); +grpc_connector* grpc_chttp2_connector_create(); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 1d3592ef06..c9f4021216 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -53,8 +53,7 @@ static void client_channel_factory_unref( static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, NULL /* add_handshakers */, NULL /* user_data */); + grpc_connector *connector = grpc_chttp2_connector_create(); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); return s; @@ -96,17 +95,16 @@ grpc_channel *grpc_insecure_channel_create(const char *target, "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, (target, args, reserved)); GPR_ASSERT(reserved == NULL); - grpc_client_channel_factory *factory = - (grpc_client_channel_factory *)&client_channel_factory; // Add channel arg containing the client channel factory. - grpc_arg arg = grpc_client_channel_factory_create_channel_arg(factory); + grpc_arg arg = + grpc_client_channel_factory_create_channel_arg(&client_channel_factory); grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); // Create channel. grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); + &exec_ctx, &client_channel_factory, target, + GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. grpc_channel_args_destroy(&exec_ctx, new_args); - grpc_client_channel_factory_unref(&exec_ctx, factory); grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 54663ef6a4..f979d9bad5 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -46,40 +46,16 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -typedef struct { - grpc_client_channel_factory base; - gpr_refcount refs; - grpc_channel_security_connector *security_connector; -} client_channel_factory; - static void client_channel_factory_ref( - grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - gpr_ref(&f->refs); -} + grpc_client_channel_factory *cc_factory) {} static void client_channel_factory_unref( - grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - if (gpr_unref(&f->refs)) { - GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &f->security_connector->base, - "client_channel_factory"); - gpr_free(f); - } -} - -static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector, - grpc_handshake_manager *handshake_mgr) { - grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, - handshake_mgr); -} + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {} static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; - grpc_connector *connector = grpc_chttp2_connector_create( - exec_ctx, add_handshakers, f->security_connector); + grpc_connector *connector = grpc_chttp2_connector_create(); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_connector_unref(exec_ctx, connector); return s; @@ -106,6 +82,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable = client_channel_factory_create_subchannel, client_channel_factory_create_channel}; +static grpc_client_channel_factory client_channel_factory = { + &client_channel_factory_vtable}; + /* Create a secure client channel: Asynchronously: - resolve target - connect to it (trying alternatives as presented) @@ -138,33 +117,26 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, return grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); } - // Create client channel factory. - client_channel_factory *f = gpr_malloc(sizeof(*f)); - memset(f, 0, sizeof(*f)); - f->base.vtable = &client_channel_factory_vtable; - gpr_ref_init(&f->refs, 1); - GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, - "grpc_secure_channel_create"); - f->security_connector = security_connector; // Add channel args containing the client channel factory and security // connector. - grpc_arg new_args[2]; - new_args[0] = grpc_client_channel_factory_create_channel_arg(&f->base); - new_args[1] = grpc_security_connector_to_arg(&security_connector->base); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( + grpc_arg args_to_add[2]; + args_to_add[0] = + grpc_client_channel_factory_create_channel_arg(&client_channel_factory); + args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add( new_args_from_connector != NULL ? new_args_from_connector : args, - new_args, GPR_ARRAY_SIZE(new_args)); + args_to_add, GPR_ARRAY_SIZE(args_to_add)); if (new_args_from_connector != NULL) { grpc_channel_args_destroy(&exec_ctx, new_args_from_connector); } // Create channel. grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args_copy); + &exec_ctx, &client_channel_factory, target, + GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. - GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &f->security_connector->base, + GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base, "secure_client_channel_factory_create_channel"); - grpc_channel_args_destroy(&exec_ctx, args_copy); - grpc_client_channel_factory_unref(&exec_ctx, &f->base); + grpc_channel_args_destroy(&exec_ctx, new_args); grpc_exec_ctx_finish(&exec_ctx); return channel; /* may be NULL */ } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 86f5a198c2..574d1a7710 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -46,6 +46,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -54,24 +55,6 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -void grpc_chttp2_server_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr) { - if (handshaker_factory != NULL) { - handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, - handshake_mgr); - } -} - -void grpc_chttp2_server_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory) { - if (handshaker_factory != NULL) { - handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); - } -} - typedef struct pending_handshake_manager_node { grpc_handshake_manager *handshake_mgr; struct pending_handshake_manager_node *next; @@ -81,7 +64,6 @@ typedef struct { grpc_server *server; grpc_tcp_server *tcp_server; grpc_channel_args *args; - grpc_chttp2_server_handshaker_factory *handshaker_factory; gpr_mu mu; bool shutdown; grpc_closure tcp_server_shutdown_complete; @@ -198,8 +180,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; - grpc_chttp2_server_handshaker_factory_add_handshakers( - exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); + grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args, + connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( @@ -233,8 +215,6 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. grpc_exec_ctx_flush(exec_ctx); - grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, - state->handshaker_factory); if (destroy_done != NULL) { destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); grpc_exec_ctx_flush(exec_ctx); @@ -259,10 +239,10 @@ static void server_destroy_listener(grpc_exec_ctx *exec_ctx, grpc_tcp_server_unref(exec_ctx, tcp_server); } -grpc_error *grpc_chttp2_server_add_port( - grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, - grpc_channel_args *args, - grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) { +grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, + grpc_server *server, const char *addr, + grpc_channel_args *args, + int *port_num) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp_server = NULL; size_t i; @@ -293,7 +273,6 @@ grpc_error *grpc_chttp2_server_add_port( state->server = server; state->tcp_server = tcp_server; state->args = args; - state->handshaker_factory = handshaker_factory; state->shutdown = true; gpr_mu_init(&state->mu); @@ -348,7 +327,6 @@ error: grpc_tcp_server_unref(exec_ctx, tcp_server); } else { grpc_channel_args_destroy(exec_ctx, args); - grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory); gpr_free(state); } *port_num = 0; diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index aa364b565d..2581ebaae9 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -36,43 +36,12 @@ #include <grpc/impl/codegen/grpc_types.h> -#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/exec_ctx.h" -/// A server handshaker factory is used to create handshakers for server -/// connections. -typedef struct grpc_chttp2_server_handshaker_factory - grpc_chttp2_server_handshaker_factory; - -typedef struct { - void (*add_handshakers)( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr); - void (*destroy)(grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory); -} grpc_chttp2_server_handshaker_factory_vtable; - -struct grpc_chttp2_server_handshaker_factory { - const grpc_chttp2_server_handshaker_factory_vtable *vtable; -}; - -void grpc_chttp2_server_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - grpc_handshake_manager *handshake_mgr); - -void grpc_chttp2_server_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, - grpc_chttp2_server_handshaker_factory *handshaker_factory); - /// Adds a port to \a server. Sets \a port_num to the port number. -/// If \a handshaker_factory is not NULL, it will be used to create -/// handshakers for the port. -/// Takes ownership of \a args and \a handshaker_factory. -grpc_error *grpc_chttp2_server_add_port( - grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, - grpc_channel_args *args, - grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num); +/// Takes ownership of \a args. +grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, + grpc_server *server, const char *addr, + grpc_channel_args *args, int *port_num); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 7e286d4e46..bf5026bea6 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -47,8 +47,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { (server, addr)); grpc_error *err = grpc_chttp2_server_add_port( &exec_ctx, server, addr, - grpc_channel_args_copy(grpc_server_get_channel_args(server)), - NULL /* handshaker_factory */, &port_num); + grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index b5e4996b16..395c79a71d 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -49,34 +49,6 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct { - grpc_chttp2_server_handshaker_factory base; - grpc_server_security_connector *security_connector; -} server_security_handshaker_factory; - -static void server_security_handshaker_factory_add_handshakers( - grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, - grpc_handshake_manager *handshake_mgr) { - server_security_handshaker_factory *handshaker_factory = - (server_security_handshaker_factory *)hf; - grpc_server_security_connector_add_handshakers( - exec_ctx, handshaker_factory->security_connector, handshake_mgr); -} - -static void server_security_handshaker_factory_destroy( - grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { - server_security_handshaker_factory *handshaker_factory = - (server_security_handshaker_factory *)hf; - GRPC_SECURITY_CONNECTOR_UNREF( - exec_ctx, &handshaker_factory->security_connector->base, "server"); - gpr_free(hf); -} - -static const grpc_chttp2_server_handshaker_factory_vtable - server_security_handshaker_factory_vtable = { - server_security_handshaker_factory_add_handshakers, - server_security_handshaker_factory_destroy}; - int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -105,20 +77,19 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, gpr_free(msg); goto done; } - // Create handshaker factory. - server_security_handshaker_factory *handshaker_factory = - gpr_malloc(sizeof(*handshaker_factory)); - memset(handshaker_factory, 0, sizeof(*handshaker_factory)); - handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; - handshaker_factory->security_connector = sc; // Create channel args. - grpc_arg channel_arg = grpc_server_credentials_to_arg(creds); - grpc_channel_args *args = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(server), &channel_arg, 1); + grpc_arg args_to_add[2]; + args_to_add[0] = grpc_server_credentials_to_arg(creds); + args_to_add[1] = grpc_security_connector_to_arg(&sc->base); + grpc_channel_args *args = + grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server), + args_to_add, GPR_ARRAY_SIZE(args_to_add)); // Add server port. - err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, - &handshaker_factory->base, &port_num); + err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, &port_num); done: + if (sc != NULL) { + GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &sc->base, "server"); + } grpc_exec_ctx_finish(&exec_ctx); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 827c99c1d1..339d2138c6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -47,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" @@ -1689,8 +1690,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (optional_message != NULL) { size_t msg_len = strlen(optional_message); - GPR_ASSERT(msg_len < 127); - message_pfx = grpc_slice_malloc(15); + GPR_ASSERT(msg_len <= UINT32_MAX); + uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0); + message_pfx = grpc_slice_malloc(14 + msg_len_len); p = GRPC_SLICE_START_PTR(message_pfx); *p++ = 0x40; *p++ = 12; /* len(grpc-message) */ @@ -1706,7 +1708,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, *p++ = 'a'; *p++ = 'g'; *p++ = 'e'; - *p++ = (uint8_t)msg_len; + GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p, + (uint32_t)msg_len_len); + p += msg_len_len; GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx)); len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx); len += (uint32_t)msg_len; diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index b4c5ed769b..20043f5fbf 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -109,7 +109,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, (((uint32_t)p->reason_bytes[2]) << 8) | (((uint32_t)p->reason_bytes[3])); grpc_error *error = GRPC_ERROR_NONE; - if (reason != GRPC_CHTTP2_NO_ERROR) { + if (reason != GRPC_CHTTP2_NO_ERROR || s->header_frames_received < 2) { error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason); grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status( diff --git a/src/core/lib/README.md b/src/core/lib/README.md new file mode 100644 index 0000000000..69b6bce2d9 --- /dev/null +++ b/src/core/lib/README.md @@ -0,0 +1,6 @@ +Required elements of gRPC Core: Each module in this directory is required to +build gRPC. If it's possible to envisage a configuration where code is not +required, then that code belongs in ext/ instead. + +NOTE: The movement of code between lib and ext is an ongoing effort, so this +directory currently contains too much of the core library. diff --git a/src/core/lib/channel/README.md b/src/core/lib/channel/README.md new file mode 100644 index 0000000000..2dfcfe6e66 --- /dev/null +++ b/src/core/lib/channel/README.md @@ -0,0 +1,4 @@ +# Channel + +Provides channel/call stack implementation, and implementation of common filters +for that implementation. diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 8dd6d099e1..a45a4d4b82 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -83,8 +83,11 @@ static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, // Take a reference to the call stack, to be owned by the timer. GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); deadline_state->timer_pending = true; - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, - elem, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, + &deadline_state->timer_callback, + gpr_now(GPR_CLOCK_MONOTONIC)); } } static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 716a852565..bd2b84f79e 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -46,6 +46,7 @@ typedef struct grpc_deadline_state { bool timer_pending; // The deadline timer. grpc_timer timer; + grpc_closure timer_callback; // Closure to invoke when the call is complete. // We use this to cancel the timer. grpc_closure on_complete; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index ff827527b3..c052ca5385 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -86,6 +86,7 @@ struct grpc_handshake_manager { grpc_tcp_server_acceptor* acceptor; // Deadline timer across all handshakers. grpc_timer deadline_timer; + grpc_closure on_timeout; // The final callback and user_data to invoke after the last handshaker. grpc_closure on_handshake_done; void* user_data; @@ -224,9 +225,11 @@ void grpc_handshake_manager_do_handshake( grpc_schedule_on_exec_ctx); // Start deadline timer, which owns a ref. gpr_ref(&mgr->refs); + grpc_closure_init(&mgr->on_timeout, on_timeout, mgr, + grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &mgr->deadline_timer, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC)); + &mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); diff --git a/src/core/lib/channel/handshaker_factory.c b/src/core/lib/channel/handshaker_factory.c new file mode 100644 index 0000000000..3c30a4e1d2 --- /dev/null +++ b/src/core/lib/channel/handshaker_factory.c @@ -0,0 +1,54 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/handshaker_factory.h" + +#include <grpc/support/log.h> + +void grpc_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr) { + if (handshaker_factory != NULL) { + GPR_ASSERT(handshaker_factory->vtable != NULL); + handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, + args, handshake_mgr); + } +} + +void grpc_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory) { + if (handshaker_factory != NULL) { + GPR_ASSERT(handshaker_factory->vtable != NULL); + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); + } +} diff --git a/src/core/lib/channel/handshaker_factory.h b/src/core/lib/channel/handshaker_factory.h new file mode 100644 index 0000000000..1984546104 --- /dev/null +++ b/src/core/lib/channel/handshaker_factory.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H +#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +// A handshaker factory is used to create handshakers. + +typedef struct grpc_handshaker_factory grpc_handshaker_factory; + +typedef struct { + void (*add_handshakers)(grpc_exec_ctx *exec_ctx, + grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, + grpc_handshake_manager *handshake_mgr); + void (*destroy)(grpc_exec_ctx *exec_ctx, + grpc_handshaker_factory *handshaker_factory); +} grpc_handshaker_factory_vtable; + +struct grpc_handshaker_factory { + const grpc_handshaker_factory_vtable *vtable; +}; + +void grpc_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr); + +void grpc_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory); + +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H */ diff --git a/src/core/lib/channel/handshaker_registry.c b/src/core/lib/channel/handshaker_registry.c new file mode 100644 index 0000000000..2e5f04064c --- /dev/null +++ b/src/core/lib/channel/handshaker_registry.c @@ -0,0 +1,113 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/handshaker_registry.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +// +// grpc_handshaker_factory_list +// + +typedef struct { + grpc_handshaker_factory** list; + size_t num_factories; +} grpc_handshaker_factory_list; + +static void grpc_handshaker_factory_list_register( + grpc_handshaker_factory_list* list, bool at_start, + grpc_handshaker_factory* factory) { + list->list = gpr_realloc( + list->list, (list->num_factories + 1) * sizeof(grpc_handshaker_factory*)); + if (at_start) { + memmove(list->list + 1, list->list, + sizeof(grpc_handshaker_factory*) * list->num_factories); + list->list[0] = factory; + } else { + list->list[list->num_factories] = factory; + } + ++list->num_factories; +} + +static void grpc_handshaker_factory_list_add_handshakers( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { + for (size_t i = 0; i < list->num_factories; ++i) { + grpc_handshaker_factory_add_handshakers(exec_ctx, list->list[i], args, + handshake_mgr); + } +} + +static void grpc_handshaker_factory_list_destroy( + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list) { + for (size_t i = 0; i < list->num_factories; ++i) { + grpc_handshaker_factory_destroy(exec_ctx, list->list[i]); + } + gpr_free(list->list); +} + +// +// plugin +// + +static grpc_handshaker_factory_list + g_handshaker_factory_lists[NUM_HANDSHAKER_TYPES]; + +void grpc_handshaker_factory_registry_init() { + memset(g_handshaker_factory_lists, 0, sizeof(g_handshaker_factory_lists)); +} + +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx) { + for (size_t i = 0; i < NUM_HANDSHAKER_TYPES; ++i) { + grpc_handshaker_factory_list_destroy(exec_ctx, + &g_handshaker_factory_lists[i]); + } +} + +void grpc_handshaker_factory_register(bool at_start, + grpc_handshaker_type handshaker_type, + grpc_handshaker_factory* factory) { + grpc_handshaker_factory_list_register( + &g_handshaker_factory_lists[handshaker_type], at_start, factory); +} + +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, + const grpc_channel_args* args, + grpc_handshake_manager* handshake_mgr) { + grpc_handshaker_factory_list_add_handshakers( + exec_ctx, &g_handshaker_factory_lists[handshaker_type], args, + handshake_mgr); +} diff --git a/src/core/lib/channel/handshaker_registry.h b/src/core/lib/channel/handshaker_registry.h new file mode 100644 index 0000000000..53c1b173af --- /dev/null +++ b/src/core/lib/channel/handshaker_registry.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H +#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +typedef enum { + HANDSHAKER_CLIENT = 0, + HANDSHAKER_SERVER, + NUM_HANDSHAKER_TYPES, // Must be last. +} grpc_handshaker_type; + +void grpc_handshaker_factory_registry_init(); +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx); + +/// Registers a new handshaker factory. Takes ownership. +/// If \a at_start is true, the new handshaker will be at the beginning of +/// the list. Otherwise, it will be added to the end. +void grpc_handshaker_factory_register(bool at_start, + grpc_handshaker_type handshaker_type, + grpc_handshaker_factory* factory); + +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, + const grpc_channel_args* args, + grpc_handshake_manager* handshake_mgr); + +#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_REGISTRY_H */ diff --git a/src/core/lib/iomgr/README.md b/src/core/lib/iomgr/README.md new file mode 100644 index 0000000000..9b22b76ceb --- /dev/null +++ b/src/core/lib/iomgr/README.md @@ -0,0 +1,6 @@ +# iomgr + +Platform abstractions for I/O (mostly network). + +Provides abstractions over TCP/UDP I/O, file loading, polling, and concurrency +management for various operating systems. diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c index 60ebe43676..f0f4a6ff39 100644 --- a/src/core/lib/iomgr/iocp_windows.c +++ b/src/core/lib/iomgr/iocp_windows.c @@ -80,7 +80,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, LPOVERLAPPED overlapped; grpc_winsocket *socket; grpc_winsocket_callback_info *info; - grpc_closure *closure = NULL; success = GetQueuedCompletionStatus( g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index c8237dc38f..9a77c92016 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -65,6 +65,7 @@ typedef struct { grpc_fd *fd; gpr_timespec deadline; grpc_timer alarm; + grpc_closure on_alarm; int refs; grpc_closure write_closure; grpc_pollset_set *interested_parties; @@ -352,9 +353,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(&ac->mu); + grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); + &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index ed0de50fc1..5225a5402b 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -49,6 +49,7 @@ typedef struct grpc_uv_tcp_connect { uv_connect_t connect_req; grpc_timer alarm; + grpc_closure on_alarm; uv_tcp_t *tcp_handle; grpc_closure *closure; grpc_endpoint **endpoint; @@ -148,9 +149,11 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tcp_connect(&connect->connect_req, connect->tcp_handle, (const struct sockaddr *)resolved_addr->addr, uv_tc_on_connect); + grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect, + grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &connect->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC)); + &connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); } // overridden by api_fuzzer.c diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 275258ebb5..1e84ec3a1e 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -58,6 +58,7 @@ typedef struct { grpc_winsocket *socket; gpr_timespec deadline; grpc_timer alarm; + grpc_closure on_alarm; char *addr_name; int refs; grpc_closure on_connect; @@ -229,7 +230,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, ac->resource_quota = resource_quota; grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, + grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); return; diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 97d7827461..dafe851ce8 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -184,7 +184,6 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - int immediately_done = 0; grpc_tcp_listener *sp; gpr_mu_lock(&s->mu); @@ -240,7 +239,7 @@ static grpc_error *prepare_socket(SOCKET sock, error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"); goto failure; } - sockname_temp.len = sockname_temp_len; + sockname_temp.len = (size_t)sockname_temp_len; *port = grpc_sockaddr_get_port(&sockname_temp); return GRPC_ERROR_NONE; @@ -248,7 +247,7 @@ static grpc_error *prepare_socket(SOCKET sock, failure: GPR_ASSERT(error != GRPC_ERROR_NONE); char *tgtaddr = grpc_sockaddr_to_uri(addr); - grpc_error *final_error = grpc_error_set_int( + grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING( "Failed to prepare server socket", &error, 1), GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr), @@ -261,7 +260,6 @@ failure: static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { - int notify = 0; sp->shutting_down = 0; GPR_ASSERT(sp->server->active_ports > 0); if (0 == --sp->server->active_ports) { @@ -375,7 +373,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { int peer_name_len = (int)peer_name.len; err = getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len); - peer_name.len = peer_name_len; + peer_name.len = (size_t)peer_name_len; if (!err) { peer_name_string = grpc_sockaddr_to_uri(&peer_name); } else { @@ -498,7 +496,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (0 == getsockname(sp->socket->socket, (struct sockaddr *)sockname_temp.addr, &sockname_temp_len)) { - sockname_temp.len = sockname_temp_len; + sockname_temp.len = (size_t)sockname_temp_len; *port = grpc_sockaddr_get_port(&sockname_temp); if (*port > 0) { allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 20fe98c4a7..d84a278b18 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -49,15 +49,15 @@ typedef struct grpc_timer grpc_timer; -/* Initialize *timer. When expired or canceled, timer_cb will be called with - *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or - was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called - exactly once, and application code should check the error to determine - how it was invoked. The application callback is also responsible for - maintaining information about when to free up any user-level state. */ +/* Initialize *timer. When expired or canceled, closure will be called with + error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled + (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called exactly once, and + application code should check the error to determine how it was invoked. The + application callback is also responsible for maintaining information about + when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, - void *timer_cb_arg, gpr_timespec now); + gpr_timespec deadline, grpc_closure *closure, + gpr_timespec now); /* Note that there is no timer destroy function. This is because the timer is a one-time occurrence with a guarantee that the callback will diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index ecd3b284dc..40c8351472 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -178,28 +178,27 @@ static void note_deadline_change(shard_type *shard) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, - void *timer_cb_arg, gpr_timespec now) { + gpr_timespec deadline, grpc_closure *closure, + gpr_timespec now) { int is_first_timer = 0; shard_type *shard = &g_shards[shard_idx(timer)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); - grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg, - grpc_schedule_on_exec_ctx); + timer->closure = closure; timer->deadline = deadline; timer->triggered = 0; if (!g_initialized) { timer->triggered = 1; grpc_closure_sched( - exec_ctx, &timer->closure, + exec_ctx, timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } @@ -251,7 +250,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -317,7 +316,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index e4494adb5f..9d901c7e68 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -43,7 +43,7 @@ struct grpc_timer { int triggered; struct grpc_timer *next; struct grpc_timer *prev; - grpc_closure closure; + grpc_closure *closure; }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */ diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 00b835ffb8..fa2cdee964 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -55,21 +55,20 @@ void run_expired_timer(uv_timer_t *handle) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(!timer->triggered); timer->triggered = 1; - grpc_closure_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE); + grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, - void *timer_cb_arg, gpr_timespec now) { + gpr_timespec deadline, grpc_closure *closure, + gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; - grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg, - grpc_schedule_on_exec_ctx); + timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } timer->triggered = 0; @@ -84,7 +83,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { if (!timer->triggered) { timer->triggered = 1; - grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED); + grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } } diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h index 3de383ebd5..13cf8bd4fa 100644 --- a/src/core/lib/iomgr/timer_uv.h +++ b/src/core/lib/iomgr/timer_uv.h @@ -37,7 +37,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" struct grpc_timer { - grpc_closure closure; + grpc_closure *closure; /* This is actually a uv_timer_t*, but we want to keep platform-specific types out of headers */ void *uv_timer; diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index e886cc59a0..5e75856c7a 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -42,6 +42,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/tsi_error.h" @@ -438,6 +439,45 @@ static grpc_handshaker *fail_handshaker_create() { } // +// handshaker factories +// + +static void client_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr) { + grpc_channel_security_connector *security_connector = + (grpc_channel_security_connector *)grpc_find_security_connector_in_args( + args); + grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, + handshake_mgr); +} + +static void server_handshaker_factory_add_handshakers( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *hf, + const grpc_channel_args *args, grpc_handshake_manager *handshake_mgr) { + grpc_server_security_connector *security_connector = + (grpc_server_security_connector *)grpc_find_security_connector_in_args( + args); + grpc_server_security_connector_add_handshakers(exec_ctx, security_connector, + handshake_mgr); +} + +static void handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_handshaker_factory *handshaker_factory) {} + +static const grpc_handshaker_factory_vtable client_handshaker_factory_vtable = { + client_handshaker_factory_add_handshakers, handshaker_factory_destroy}; + +static grpc_handshaker_factory client_handshaker_factory = { + &client_handshaker_factory_vtable}; + +static const grpc_handshaker_factory_vtable server_handshaker_factory_vtable = { + server_handshaker_factory_add_handshakers, handshaker_factory_destroy}; + +static grpc_handshaker_factory server_handshaker_factory = { + &server_handshaker_factory_vtable}; + +// // exported functions // @@ -452,3 +492,10 @@ grpc_handshaker *grpc_security_handshaker_create( return security_handshaker_create(exec_ctx, handshaker, connector); } } + +void grpc_security_register_handshaker_factories() { + grpc_handshaker_factory_register(false /* at_start */, HANDSHAKER_CLIENT, + &client_handshaker_factory); + grpc_handshaker_factory_register(false /* at_start */, HANDSHAKER_SERVER, + &server_handshaker_factory); +} diff --git a/src/core/lib/security/transport/security_handshaker.h b/src/core/lib/security/transport/security_handshaker.h index 5ddbf4b451..0b9eda178f 100644 --- a/src/core/lib/security/transport/security_handshaker.h +++ b/src/core/lib/security/transport/security_handshaker.h @@ -43,4 +43,7 @@ grpc_handshaker *grpc_security_handshaker_create( grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector); +/// Registers security handshaker factories. +void grpc_security_register_handshaker_factories(); + #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */ diff --git a/src/core/lib/surface/README.md b/src/core/lib/surface/README.md new file mode 100644 index 0000000000..74cbd71131 --- /dev/null +++ b/src/core/lib/surface/README.md @@ -0,0 +1,4 @@ +# Surface + +Surface provides the bulk of the gRPC Core public API, and translates it into +calls against core components. diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index aa9d60ee6a..e71c0ebfc5 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -38,6 +38,7 @@ struct grpc_alarm { grpc_timer alarm; + grpc_closure on_alarm; grpc_cq_completion completion; /** completion queue where events about this alarm will be posted */ grpc_completion_queue *cq; @@ -64,9 +65,11 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->tag = tag; grpc_cq_begin_op(cq, tag); + grpc_closure_init(&alarm->on_alarm, alarm_cb, alarm, + grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + &alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_exec_ctx_finish(&exec_ctx); return alarm; } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index e20e602547..f61bf1582e 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/channel/message_size_filter.h" @@ -204,6 +205,8 @@ void grpc_init(void) { grpc_executor_init(); gpr_timers_global_init(); grpc_cq_global_init(); + grpc_handshaker_factory_registry_init(); + grpc_security_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -238,6 +241,7 @@ void grpc_shutdown(void) { } } grpc_mdctx_global_shutdown(&exec_ctx); + grpc_handshaker_factory_registry_shutdown(&exec_ctx); } gpr_mu_unlock(&g_init_mu); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/init.h b/src/core/lib/surface/init.h index b1bf57c10d..3d1c528be0 100644 --- a/src/core/lib/surface/init.h +++ b/src/core/lib/surface/init.h @@ -36,6 +36,7 @@ void grpc_register_security_filters(void); void grpc_security_pre_init(void); +void grpc_security_init(void); int grpc_is_initialized(void); #endif /* GRPC_CORE_LIB_SURFACE_INIT_H */ diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 520a8aa84f..a44407d3bb 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -41,6 +41,7 @@ #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/security_connector.h" +#include "src/core/lib/security/transport/security_handshaker.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/tsi/transport_security_interface.h" @@ -87,3 +88,5 @@ void grpc_register_security_filters(void) { grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, maybe_prepend_server_auth_filter, NULL); } + +void grpc_security_init() { grpc_security_register_handshaker_factories(); } diff --git a/src/core/lib/surface/init_unsecure.c b/src/core/lib/surface/init_unsecure.c index f952739e0a..dbc9223ae0 100644 --- a/src/core/lib/surface/init_unsecure.c +++ b/src/core/lib/surface/init_unsecure.c @@ -36,3 +36,5 @@ void grpc_security_pre_init(void) {} void grpc_register_security_filters(void) {} + +void grpc_security_init(void) {} diff --git a/src/core/lib/transport/README.md b/src/core/lib/transport/README.md new file mode 100644 index 0000000000..e7e135edeb --- /dev/null +++ b/src/core/lib/transport/README.md @@ -0,0 +1,7 @@ +# Transport + +Common implementation details for gRPC Transports. + +Transports multiplex messages across some single connection. In ext/ there are +implementations atop [a custom http2 implementation](/src/core/ext/transport/chttp2/README.md) +and atop [cronet](/src/core/ext/transport/cronet/README.md). diff --git a/src/core/lib/tsi/README.md b/src/core/lib/tsi/README.md new file mode 100644 index 0000000000..3ca3c1ef38 --- /dev/null +++ b/src/core/lib/tsi/README.md @@ -0,0 +1,2 @@ +# Transport Security Interface +An abstraction library over crypto and auth modules (typically OpenSSL) diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 46ddecfb1f..75cfb41342 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -312,6 +312,9 @@ function customMetadata(client, done) { } }; var streaming_arg = { + response_parameters: [ + {size: 314159} + ], payload: { body: zeroBuffer(271828) } diff --git a/src/php/bin/generate_proto_php.sh b/src/php/bin/generate_proto_php.sh index c558bc5769..416fa9df97 100755 --- a/src/php/bin/generate_proto_php.sh +++ b/src/php/bin/generate_proto_php.sh @@ -39,10 +39,13 @@ protoc --proto_path=src/proto/math \ # replace the Empty message with EmptyMessage # because Empty is a PHP reserved word -sed -i 's/message Empty/message EmptyMessage/g' \ - src/proto/grpc/testing/empty.proto -sed -i 's/grpc\.testing\.Empty/grpc\.testing\.EmptyMessage/g' \ - src/proto/grpc/testing/test.proto +output_file=$(mktemp) +sed 's/message Empty/message EmptyMessage/g' \ + src/proto/grpc/testing/empty.proto > $output_file +mv $output_file ./src/proto/grpc/testing/empty.proto +sed 's/grpc\.testing\.Empty/grpc\.testing\.EmptyMessage/g' \ + src/proto/grpc/testing/test.proto > $output_file +mv $output_file ./src/proto/grpc/testing/test.proto protoc --proto_path=. \ --php_out=src/php/tests/interop \ @@ -53,7 +56,10 @@ protoc --proto_path=. \ src/proto/grpc/testing/test.proto # change it back -sed -i 's/message EmptyMessage/message Empty/g' \ - src/proto/grpc/testing/empty.proto -sed -i 's/grpc\.testing\.EmptyMessage/grpc\.testing\.Empty/g' \ - src/proto/grpc/testing/test.proto +sed 's/message EmptyMessage/message Empty/g' \ + src/proto/grpc/testing/empty.proto > $output_file +mv $output_file ./src/proto/grpc/testing/empty.proto +sed 's/grpc\.testing\.EmptyMessage/grpc\.testing\.Empty/g' \ + src/proto/grpc/testing/test.proto > $output_file +mv $output_file ./src/proto/grpc/testing/test.proto + diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 3a49ea8708..64b1137c2a 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -478,6 +478,7 @@ PHP_METHOD(Call, startBatch) { true); add_property_zval(result, "status", recv_status); PHP_GRPC_DELREF(recv_status); + PHP_GRPC_FREE_STD_ZVAL(recv_status); break; case GRPC_OP_RECV_CLOSE_ON_SERVER: add_property_bool(result, "cancelled", cancelled); @@ -501,6 +502,7 @@ cleanup: } if (ops[i].op == GRPC_OP_RECV_MESSAGE) { grpc_byte_buffer_destroy(message); + PHP_GRPC_FREE_STD_ZVAL(message_str); } } RETURN_DESTROY_ZVAL(result); diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index 3aafc3a19b..043817facd 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -111,9 +111,7 @@ PHP_METHOD(CallCredentials, createComposite) { grpc_call_credentials *creds = grpc_composite_call_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -155,9 +153,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) { grpc_call_credentials *creds = grpc_metadata_credentials_create_from_plugin(plugin, NULL); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -211,6 +207,8 @@ void plugin_destroy_state(void *ptr) { plugin_state *state = (plugin_state *)ptr; efree(state->fci); efree(state->fci_cache); + PHP_GRPC_FREE_STD_ZVAL(state->fci->params); + PHP_GRPC_FREE_STD_ZVAL(state->fci->retval); efree(state); } diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index a32c4a4ea2..36a8223b88 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -121,9 +121,7 @@ PHP_METHOD(ChannelCredentials, setDefaultRootsPem) { */ PHP_METHOD(ChannelCredentials, createDefault) { grpc_channel_credentials *creds = grpc_google_default_credentials_create(); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -160,9 +158,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { grpc_channel_credentials *creds = grpc_ssl_credentials_create( pem_root_certs, pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } @@ -191,9 +187,7 @@ PHP_METHOD(ChannelCredentials, createComposite) { grpc_channel_credentials *creds = grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped, NULL); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/php7_wrapper.h b/src/php/ext/grpc/php7_wrapper.h index 1d7824113f..72e982d6fc 100644 --- a/src/php/ext/grpc/php7_wrapper.h +++ b/src/php/ext/grpc/php7_wrapper.h @@ -50,8 +50,13 @@ #define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val, dup) #define PHP_GRPC_MAKE_STD_ZVAL(pzv) MAKE_STD_ZVAL(pzv) +#define PHP_GRPC_FREE_STD_ZVAL(pzv) #define PHP_GRPC_DELREF(zv) Z_DELREF_P(zv) +#define RETURN_DESTROY_ZVAL(val) \ + RETURN_ZVAL(val, false /* Don't execute copy constructor */, \ + true /* Dealloc original before returning */) + #define PHP_GRPC_WRAP_OBJECT_START(name) \ typedef struct name { \ zend_object std; @@ -144,8 +149,15 @@ static inline int php_grpc_zend_hash_find(HashTable *ht, char *key, int len, #define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val) #define PHP_GRPC_MAKE_STD_ZVAL(pzv) \ pzv = (zval *)emalloc(sizeof(zval)); +#define PHP_GRPC_FREE_STD_ZVAL(pzv) efree(pzv); #define PHP_GRPC_DELREF(zv) +#define RETURN_DESTROY_ZVAL(val) \ + RETVAL_ZVAL(val, false /* Don't execute copy constructor */, \ + true /* Dealloc original before returning */); \ + efree(val); \ + return + #define PHP_GRPC_WRAP_OBJECT_START(name) \ typedef struct name { #define PHP_GRPC_WRAP_OBJECT_END(name) \ diff --git a/src/php/ext/grpc/php_grpc.h b/src/php/ext/grpc/php_grpc.h index e57a06545e..13083b0bc7 100644 --- a/src/php/ext/grpc/php_grpc.h +++ b/src/php/ext/grpc/php_grpc.h @@ -61,10 +61,6 @@ extern zend_module_entry grpc_module_entry; #include "grpc/grpc.h" -#define RETURN_DESTROY_ZVAL(val) \ - RETURN_ZVAL(val, false /* Don't execute copy constructor */, \ - true /* Dealloc original before returning */) - /* These are all function declarations */ /* Code that runs at module initialization */ PHP_MINIT_FUNCTION(grpc); diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c index 1610c8ebb0..3e39fee246 100644 --- a/src/php/ext/grpc/server_credentials.c +++ b/src/php/ext/grpc/server_credentials.c @@ -113,9 +113,7 @@ PHP_METHOD(ServerCredentials, createSsl) { grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex( pem_root_certs, &pem_key_cert_pair, 1, GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, NULL); - zval *creds_object; - PHP_GRPC_MAKE_STD_ZVAL(creds_object); - creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC); + zval *creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c index 945231b47f..7ada915aad 100644 --- a/src/php/ext/grpc/timeval.c +++ b/src/php/ext/grpc/timeval.c @@ -115,11 +115,9 @@ PHP_METHOD(Timeval, add) { } wrapped_grpc_timeval *self = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); wrapped_grpc_timeval *other = Z_WRAPPED_GRPC_TIMEVAL_P(other_obj); - zval *sum; - PHP_GRPC_MAKE_STD_ZVAL(sum); - sum = - grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) - TSRMLS_CC); + zval *sum = + grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) + TSRMLS_CC); RETURN_DESTROY_ZVAL(sum); } @@ -141,11 +139,9 @@ PHP_METHOD(Timeval, subtract) { } wrapped_grpc_timeval *self = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); wrapped_grpc_timeval *other = Z_WRAPPED_GRPC_TIMEVAL_P(other_obj); - zval *diff; - PHP_GRPC_MAKE_STD_ZVAL(diff); - diff = - grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) - TSRMLS_CC); + zval *diff = + grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) + TSRMLS_CC); RETURN_DESTROY_ZVAL(diff); } @@ -206,9 +202,7 @@ PHP_METHOD(Timeval, similar) { * @return Timeval The current time */ PHP_METHOD(Timeval, now) { - zval *now; - PHP_GRPC_MAKE_STD_ZVAL(now); - now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC); + zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(now); } @@ -217,13 +211,9 @@ PHP_METHOD(Timeval, now) { * @return Timeval Zero length time interval */ PHP_METHOD(Timeval, zero) { - zval *grpc_php_timeval_zero; - PHP_GRPC_MAKE_STD_ZVAL(grpc_php_timeval_zero); - grpc_php_timeval_zero = - grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC); - RETURN_ZVAL(grpc_php_timeval_zero, - false, /* Copy original before returning? */ - true /* Destroy original before returning */); + zval *grpc_php_timeval_zero = + grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC); + RETURN_DESTROY_ZVAL(grpc_php_timeval_zero); } /** @@ -231,10 +221,8 @@ PHP_METHOD(Timeval, zero) { * @return Timeval Infinite future time value */ PHP_METHOD(Timeval, infFuture) { - zval *grpc_php_timeval_inf_future; - PHP_GRPC_MAKE_STD_ZVAL(grpc_php_timeval_inf_future); - grpc_php_timeval_inf_future = - grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC); + zval *grpc_php_timeval_inf_future = + grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future); } @@ -243,10 +231,8 @@ PHP_METHOD(Timeval, infFuture) { * @return Timeval Infinite past time value */ PHP_METHOD(Timeval, infPast) { - zval *grpc_php_timeval_inf_past; - PHP_GRPC_MAKE_STD_ZVAL(grpc_php_timeval_inf_past); - grpc_php_timeval_inf_past = - grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC); + zval *grpc_php_timeval_inf_past = + grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past); } diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index aec60af094..a9e77b9396 100644 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -271,7 +271,7 @@ class BaseStub * @return ClientStreamingSurfaceActiveCall The active call object */ public function _clientStreamRequest($method, - callable $deserialize, + $deserialize, array $metadata = [], array $options = []) { @@ -307,7 +307,7 @@ class BaseStub */ public function _serverStreamRequest($method, $argument, - callable $deserialize, + $deserialize, array $metadata = [], array $options = []) { @@ -340,7 +340,7 @@ class BaseStub * @return BidiStreamingSurfaceActiveCall The active call object */ public function _bidiRequest($method, - callable $deserialize, + $deserialize, array $metadata = [], array $options = []) { diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index d201915d66..2acf5612c7 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -451,11 +451,22 @@ function customMetadata($stub) $streaming_request = new grpc\testing\StreamingOutputCallRequest(); $streaming_request->setPayload($payload); + $response_parameters = new grpc\testing\ResponseParameters(); + $response_parameters->setSize($response_len); + $streaming_request->getResponseParameters()[] = $response_parameters; $streaming_call->write($streaming_request); $streaming_call->writesDone(); + $result = $streaming_call->read(); hardAssertIfStatusOk($streaming_call->getStatus()); + $streaming_initial_metadata = $streaming_call->getMetadata(); + hardAssert(array_key_exists($ECHO_INITIAL_KEY, $streaming_initial_metadata), + 'Initial metadata does not contain expected key'); + hardAssert( + $streaming_initial_metadata[$ECHO_INITIAL_KEY][0] === $ECHO_INITIAL_VALUE, + 'Incorrect initial metadata value'); + $streaming_trailing_metadata = $streaming_call->getTrailingMetadata(); hardAssert(array_key_exists($ECHO_TRAILING_KEY, $streaming_trailing_metadata), diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index be387cf786..529313cfb1 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -78,6 +78,14 @@ message SecurityParams { string server_host_override = 2; } +message ChannelArg { + string name = 1; + oneof value { + string str_value = 2; + int32 int_value = 3; + } +} + message ClientConfig { // List of targets to connect to. At least one target needs to be specified. repeated string server_targets = 1; @@ -103,6 +111,8 @@ message ClientConfig { // If we use an OTHER_CLIENT client_type, this string gives more detail string other_client_api = 15; + + repeated ChannelArg channel_args = 16; } message ClientStatus { ClientStats stats = 1; } diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 41e9163cd6..e8c6a99cb1 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -32,6 +32,7 @@ import sys import threading import time +import logging import grpc from grpc import _common @@ -197,7 +198,16 @@ def _consume_request_iterator( event_handler = _event_handler(state, call, None) def consume_request_iterator(): - for request in request_iterator: + while True: + try: + request = next(request_iterator) + except StopIteration: + break + except Exception as e: + logging.exception("Exception iterating requests!") + call.cancel() + _abort(state, grpc.StatusCode.UNKNOWN, "Exception iterating requests!") + return serialized_request = _common.serialize(request, request_serializer) with state.condition: if state.code is None and not state.cancelled: diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d43f93b94f..e27e9e181d 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,6 +82,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/deadline_filter.c', 'src/core/lib/channel/handshaker.c', + 'src/core/lib/channel/handshaker_factory.c', + 'src/core/lib/channel/handshaker_registry.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', 'src/core/lib/channel/message_size_filter.c', diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 83b46c914e..650e4756e7 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -68,12 +68,8 @@ class BenchmarkClient: else: channel = grpc.insecure_channel(server) - connected_event = threading.Event() - def wait_for_ready(connectivity): - if connectivity == grpc.ChannelConnectivity.READY: - connected_event.set() - channel.subscribe(wait_for_ready, try_to_connect=True) - connected_event.wait() + # waits for the channel to be ready before we start sending messages + grpc.channel_ready_future(channel).result() if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index c7bfeaeb95..43d6c971b5 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -75,7 +75,7 @@ class ReflectionServicerTest(unittest.TestCase): file_by_filename='i-donut-exist' ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -93,7 +93,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) def testFileBySymbol(self): requests = ( @@ -104,7 +104,7 @@ class ReflectionServicerTest(unittest.TestCase): file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo' ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -122,7 +122,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) @unittest.skip('TODO(atash): implement file-containing-extension reflection ' '(see https://github.com/google/protobuf/issues/2248)') @@ -141,7 +141,7 @@ class ReflectionServicerTest(unittest.TestCase): ), ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -159,7 +159,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) def testListServices(self): requests = ( @@ -167,7 +167,7 @@ class ReflectionServicerTest(unittest.TestCase): list_services='', ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -179,7 +179,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index 390ea13021..b8116729b5 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -110,10 +110,13 @@ def _get_channel(target, args): channel_credentials = grpc.ssl_channel_credentials( root_certificates=root_certificates) options = (('grpc.ssl_target_name_override', args.server_host_override,),) - return grpc.secure_channel( - target, channel_credentials, options=options) + channel = grpc.secure_channel(target, channel_credentials, options=options) else: - return grpc.insecure_channel(target) + channel = grpc.insecure_channel(target) + + # waits for the channel to be ready before we start sending messages + grpc.channel_ready_future(channel).result() + return channel def run_test(args): test_cases = _parse_weighted_test_cases(args.test_cases) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 0109ee2173..70d965d3ca 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -28,6 +28,7 @@ "unit._empty_message_test.EmptyMessageTest", "unit._exit_test.ExitTest", "unit._invalid_metadata_test.InvalidMetadataTest", + "unit._invocation_defects_test.InvocationDefectsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_test.MetadataTest", "unit._rpc_test.RPCTest", diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 83b9109466..4d3f02e917 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -125,7 +125,7 @@ class CompressionTest(unittest.TestCase): compressed_channel = grpc.insecure_channel('localhost:%d' % self._port, options=[('grpc.default_compression_algorithm', 1)]) multi_callable = compressed_channel.stream_stream(_STREAM_STREAM) - call = multi_callable([request] * test_constants.STREAM_LENGTH) + call = multi_callable(iter([request] * test_constants.STREAM_LENGTH)) for response in call: self.assertEqual(request, response) diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 131f6e9452..69f4689279 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -123,12 +123,12 @@ class EmptyMessageTest(unittest.TestCase): def testStreamUnary(self): response = self._channel.stream_unary(_STREAM_UNARY)( - [_REQUEST] * test_constants.STREAM_LENGTH) + iter([_REQUEST] * test_constants.STREAM_LENGTH)) self.assertEqual(_RESPONSE, response) def testStreamStream(self): response_iterator = self._channel.stream_stream(_STREAM_STREAM)( - [_REQUEST] * test_constants.STREAM_LENGTH) + iter([_REQUEST] * test_constants.STREAM_LENGTH)) self.assertSequenceEqual( [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator)) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index b33802bf57..777527137f 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -240,7 +240,7 @@ if __name__ == '__main__': multi_callable = channel.stream_unary(method) future = multi_callable.future(infinite_request_iterator()) result, call = multi_callable.with_call( - [REQUEST] * test_constants.STREAM_LENGTH) + iter([REQUEST] * test_constants.STREAM_LENGTH)) elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): multi_callable = channel.stream_stream(method) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py new file mode 100644 index 0000000000..4312679bb9 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -0,0 +1,247 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import itertools +import threading +import unittest +from concurrent import futures + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control + +_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 +_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] +_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 +_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] + +_UNARY_UNARY = '/test/UnaryUnary' +_UNARY_STREAM = '/test/UnaryStream' +_STREAM_UNARY = '/test/StreamUnary' +_STREAM_STREAM = '/test/StreamStream' + + +class _Callback(object): + def __init__(self): + self._condition = threading.Condition() + self._value = None + self._called = False + + def __call__(self, value): + with self._condition: + self._value = value + self._called = True + self._condition.notify_all() + + def value(self): + with self._condition: + while not self._called: + self._condition.wait() + return self._value + + +class _Handler(object): + def __init__(self, control): + self._control = control + + def handle_unary_unary(self, request, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return request + + def handle_unary_stream(self, request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH): + self._control.control() + yield request + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + + def handle_stream_unary(self, request_iterator, servicer_context): + if servicer_context is not None: + servicer_context.invocation_metadata() + self._control.control() + response_elements = [] + for request in request_iterator: + self._control.control() + response_elements.append(request) + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return b''.join(response_elements) + + def handle_stream_stream(self, request_iterator, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + for request in request_iterator: + self._control.control() + yield request + self._control.control() + + +class _MethodHandler(grpc.RpcMethodHandler): + def __init__( + self, request_streaming, response_streaming, request_deserializer, + response_serializer, unary_unary, unary_stream, stream_unary, + stream_stream): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = request_deserializer + self.response_serializer = response_serializer + self.unary_unary = unary_unary + self.unary_stream = unary_stream + self.stream_unary = stream_unary + self.stream_stream = stream_stream + + +class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._handler = handler + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler( + False, False, None, None, self._handler.handle_unary_unary, None, + None, None) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler( + False, True, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, + self._handler.handle_unary_stream, None, None) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler( + True, False, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, None, + self._handler.handle_stream_unary, None) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler( + True, True, None, None, None, None, None, + self._handler.handle_stream_stream) + else: + return None + + +class FailAfterFewIterationsCounter(object): + def __init__(self, high, bytestring): + self._current = 0 + self._high = high + self._bytestring = bytestring + + def __iter__(self): + return self + + def __next__(self): + if self._current >= self._high: + raise Exception("This is a deliberate failure in a unit test.") + else: + self._current += 1 + return self._bytestring + + +def _unary_unary_multi_callable(channel): + return channel.unary_unary(_UNARY_UNARY) + + +def _unary_stream_multi_callable(channel): + return channel.unary_stream( + _UNARY_STREAM, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_unary_multi_callable(channel): + return channel.stream_unary( + _STREAM_UNARY, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_stream_multi_callable(channel): + return channel.stream_stream(_STREAM_STREAM) + + +class InvocationDefectsTest(unittest.TestCase): + def setUp(self): + self._control = test_control.PauseFailControl() + self._handler = _Handler(self._control) + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + + self._server = grpc.server(self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) + self._server.start() + + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._server.stop(0) + + def testIterableStreamRequestBlockingUnaryResponse(self): + requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_unary_multi_callable(self._channel) + + with self.assertRaises(grpc.RpcError): + response = multi_callable( + requests, + metadata=(('test', 'IterableStreamRequestBlockingUnaryResponse'),)) + + def testIterableStreamRequestFutureUnaryResponse(self): + requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_unary_multi_callable(self._channel) + response_future = multi_callable.future( + requests, + metadata=( + ('test', 'IterableStreamRequestFutureUnaryResponse'),)) + + with self.assertRaises(grpc.RpcError): + response = response_future.result() + + def testIterableStreamRequestStreamResponse(self): + requests = [b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_stream_multi_callable(self._channel) + response_iterator = multi_callable( + requests, + metadata=(('test', 'IterableStreamRequestStreamResponse'),)) + + with self.assertRaises(grpc.RpcError): + next(response_iterator) + + def testIteratorStreamRequestStreamResponse(self): + requests_iterator = FailAfterFewIterationsCounter( + test_constants.STREAM_LENGTH // 2, b'\x07\x08') + multi_callable = _stream_stream_multi_callable(self._channel) + response_iterator = multi_callable( + requests_iterator, + metadata=(('test', 'IteratorStreamRequestStreamResponse'),)) + + with self.assertRaises(grpc.RpcError): + for _ in range(test_constants.STREAM_LENGTH // 2 + 1): + next(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index da73476929..caba53ffcc 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -193,7 +193,7 @@ class MetadataTest(unittest.TestCase): def testStreamUnary(self): multi_callable = self._channel.stream_unary(_STREAM_UNARY) unused_response, call = multi_callable.with_call( - [_REQUEST] * test_constants.STREAM_LENGTH, + iter([_REQUEST] * test_constants.STREAM_LENGTH), metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) @@ -202,7 +202,7 @@ class MetadataTest(unittest.TestCase): def testStreamStream(self): multi_callable = self._channel.stream_stream(_STREAM_STREAM) - call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH, + call = multi_callable(iter([_REQUEST] * test_constants.STREAM_LENGTH), metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) diff --git a/src/ruby/pb/src/proto/grpc/testing/test_services_pb.rb b/src/ruby/pb/src/proto/grpc/testing/test_services_pb.rb index fde328e4c5..7d1072e512 100644 --- a/src/ruby/pb/src/proto/grpc/testing/test_services_pb.rb +++ b/src/ruby/pb/src/proto/grpc/testing/test_services_pb.rb @@ -54,6 +54,10 @@ module Grpc rpc :EmptyCall, Empty, Empty # One request followed by one response. rpc :UnaryCall, SimpleRequest, SimpleResponse + # One request followed by one response. Response has cache control + # headers set such that a caching HTTP proxy (such as GFE) can + # satisfy subsequent requests. + rpc :CacheableUnaryCall, SimpleRequest, SimpleResponse # One request followed by a sequence of responses (streamed download). # The server returns the payload with client desired type and sizes. rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) @@ -69,6 +73,9 @@ module Grpc # stream of responses are returned to the client when the server starts with # first request. rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) + # The test server will not implement this method. It will be used + # to test the behavior when clients call unimplemented methods. + rpc :UnimplementedCall, Empty, Empty end Stub = Service.rpc_stub_class diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index f101f9d89e..9c4ee9c6f2 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -158,14 +158,26 @@ def create_stub(opts) GRPC.logger.info("... connecting securely to #{address}") stub_opts[:channel_args].merge!(compression_channel_args) - Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) + if opts.test_case == "unimplemented_service" + Grpc::Testing::UnimplementedService::Stub.new(address, creds, **stub_opts) + else + Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) + end else GRPC.logger.info("... connecting insecurely to #{address}") - Grpc::Testing::TestService::Stub.new( - address, - :this_channel_is_insecure, - channel_args: compression_channel_args - ) + if opts.test_case == "unimplemented_service" + Grpc::Testing::UnimplementedService::Stub.new( + address, + :this_channel_is_insecure, + channel_args: compression_channel_args + ) + else + Grpc::Testing::TestService::Stub.new( + address, + :this_channel_is_insecure, + channel_args: compression_channel_args + ) + end end end @@ -502,6 +514,153 @@ class NamedTests op.wait end + def unimplemented_method + begin + resp = @stub.unimplemented_call(Empty.new) + rescue GRPC::Unimplemented => e + return + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + fail AssertionError, "GRPC::Unimplemented should have been raised. Was not." + end + + def unimplemented_service + begin + resp = @stub.unimplemented_call(Empty.new) + rescue GRPC::Unimplemented => e + return + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + fail AssertionError, "GRPC::Unimplemented should have been raised. Was not." + end + + def status_code_and_message + + # Function wide constants. + message = "test status method" + code = GRPC::Core::StatusCodes::UNKNOWN + + # Testing with UnaryCall. + payload = Payload.new(type: :COMPRESSABLE, body: nulls(1)) + echo_status = EchoStatus.new(code: code, message: message) + req = SimpleRequest.new(response_type: :COMPRESSABLE, + response_size: 1, + payload: payload, + response_status: echo_status) + seen_correct_exception = false + begin + resp = @stub.unary_call(req) + rescue GRPC::Unknown => e + if e.details != message + fail AssertionError, + "Expected message #{message}. Received: #{e.details}" + end + seen_correct_exception = true + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + + if not seen_correct_exception + fail AssertionError, "Did not see expected status from UnaryCall" + end + + # testing with FullDuplex + req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters + duplex_req = req_cls.new(payload: Payload.new(body: nulls(1)), + response_type: :COMPRESSABLE, + response_parameters: [p_cls.new(size: 1)], + response_status: echo_status) + seen_correct_exception = false + begin + resp = @stub.full_duplex_call([duplex_req]) + resp.each { |r| } + rescue GRPC::Unknown => e + if e.details != message + fail AssertionError, + "Expected message #{message}. Received: #{e.details}" + end + seen_correct_exception = true + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + + if not seen_correct_exception + fail AssertionError, "Did not see expected status from FullDuplexCall" + end + + end + + + def custom_metadata + + # Function wide constants + req_size, wanted_response_size = 271_828, 314_159 + initial_metadata_key = "x-grpc-test-echo-initial" + initial_metadata_value = "test_initial_metadata_value" + trailing_metadata_key = "x-grpc-test-echo-trailing-bin" + trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b" + + metadata = { + initial_metadata_key => initial_metadata_value, + trailing_metadata_key => trailing_metadata_value + } + + # Testing with UnaryCall + payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size)) + req = SimpleRequest.new(response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload) + + op = @stub.unary_call(req, metadata: metadata, return_op: true) + op.execute + if not op.metadata.has_key?(initial_metadata_key) + fail AssertionError, "Expected initial metadata. None received" + elsif op.metadata[initial_metadata_key] != metadata[initial_metadata_key] + fail AssertionError, + "Expected initial metadata: #{metadata[initial_metadata_key]}. "\ + "Received: #{op.metadata[initial_metadata_key]}" + end + if not op.trailing_metadata.has_key?(trailing_metadata_key) + fail AssertionError, "Expected trailing metadata. None received" + elsif op.trailing_metadata[trailing_metadata_key] != + metadata[trailing_metadata_key] + fail AssertionError, + "Expected trailing metadata: #{metadata[trailing_metadata_key]}. "\ + "Received: #{op.trailing_metadata[trailing_metadata_key]}" + end + + # Testing with FullDuplex + req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters + duplex_req = req_cls.new(payload: Payload.new(body: nulls(req_size)), + response_type: :COMPRESSABLE, + response_parameters: [p_cls.new(size: wanted_response_size)]) + + duplex_op = @stub.full_duplex_call([duplex_req], metadata: metadata, + return_op: true) + resp = duplex_op.execute + resp.each { |r| } # ensures that the server sends trailing data + duplex_op.wait + if not duplex_op.metadata.has_key?(initial_metadata_key) + fail AssertionError, "Expected initial metadata. None received" + elsif duplex_op.metadata[initial_metadata_key] != + metadata[initial_metadata_key] + fail AssertionError, + "Expected initial metadata: #{metadata[initial_metadata_key]}. "\ + "Received: #{duplex_op.metadata[initial_metadata_key]}" + end + if not duplex_op.trailing_metadata[trailing_metadata_key] + fail AssertionError, "Expected trailing metadata. None received" + elsif duplex_op.trailing_metadata[trailing_metadata_key] != + metadata[trailing_metadata_key] + fail AssertionError, + "Expected trailing metadata: #{metadata[trailing_metadata_key]}. "\ + "Received: #{duplex_op.trailing_metadata[trailing_metadata_key]}" + end + + end + def all all_methods = NamedTests.instance_methods(false).map(&:to_s) all_methods.each do |m| |