diff options
Diffstat (limited to 'src')
105 files changed, 1308 insertions, 684 deletions
diff --git a/src/core/lib/client_config/lb_policies/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 4cbed200df..d8af644870 100644 --- a/src/core/lib/client_config/lb_policies/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/client_config/lb_policies/load_balancer_api.h" +#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "third_party/nanopb/pb_decode.h" #include "third_party/nanopb/pb_encode.h" diff --git a/src/core/lib/client_config/lb_policies/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 83299adfa9..d329a2ffe8 100644 --- a/src/core/lib/client_config/lb_policies/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -31,13 +31,13 @@ * */ -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H +#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H +#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H #include <grpc/support/slice_buffer.h> +#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h" #include "src/core/lib/client_config/lb_policy_factory.h" -#include "src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h" #ifdef __cplusplus extern "C" { @@ -82,4 +82,4 @@ void grpc_grpclb_response_destroy(grpc_grpclb_response *response); } #endif -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H */ +#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_LOAD_BALANCER_API_H */ diff --git a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c index 8f82141f96..9719673181 100644 --- a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c @@ -33,7 +33,7 @@ /* Automatically generated nanopb constant definitions */ /* Generated by nanopb-0.3.5-dev */ -#include "src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h" +#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h" #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. diff --git a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h index 3599f881bb..3599f881bb 100644 --- a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h diff --git a/src/core/lib/client_config/lb_policies/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 2e399b73f9..95fe372c1b 100644 --- a/src/core/lib/client_config/lb_policies/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -31,12 +31,10 @@ * */ -#include "src/core/lib/client_config/lb_policies/pick_first.h" -#include "src/core/lib/client_config/lb_policy_factory.h" - #include <string.h> #include <grpc/support/alloc.h> +#include "src/core/lib/client_config/lb_policy_registry.h" #include "src/core/lib/transport/connectivity_state.h" typedef struct pending_pick { @@ -78,7 +76,7 @@ typedef struct { #define GET_SELECTED(p) \ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) -void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connected_subchannel *selected = GET_SELECTED(p); size_t i; @@ -95,7 +93,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; grpc_connected_subchannel *selected; @@ -162,7 +160,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { &p->connectivity_changed); } -void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); if (!p->started_picking) { @@ -171,9 +169,10 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { +static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -356,9 +355,10 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, return st; } -void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, @@ -366,8 +366,8 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); } -void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connected_subchannel *selected = GET_SELECTED(p); if (selected) { @@ -391,19 +391,42 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, +static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - if (args->num_subchannels == 0) return NULL; + GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->subchannel_factory != NULL); + + if (args->addresses->naddrs == 0) return NULL; + pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); + p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); - p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "pick_first"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); + gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + grpc_subchannel_args sc_args; + size_t subchannel_idx = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); + sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( + exec_ctx, args->subchannel_factory, &sc_args); + + if (subchannel != NULL) { + p->subchannels[subchannel_idx++] = subchannel; + } + } + if (subchannel_idx == 0) { + gpr_free(p->subchannels); + gpr_free(p); + return NULL; + } + p->num_subchannels = subchannel_idx; + + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; @@ -416,6 +439,14 @@ static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { static grpc_lb_policy_factory pick_first_lb_policy_factory = { &pick_first_factory_vtable}; -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { +static grpc_lb_policy_factory *pick_first_lb_factory_create() { return &pick_first_lb_policy_factory; } + +/* Plugin registration */ + +void grpc_lb_policy_pick_first_init() { + grpc_register_lb_policy(pick_first_lb_factory_create()); +} + +void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/lib/client_config/lb_policies/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index c904c5f921..eb6cccdca9 100644 --- a/src/core/lib/client_config/lb_policies/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -31,11 +31,12 @@ * */ -#include "src/core/lib/client_config/lb_policies/round_robin.h" - #include <string.h> #include <grpc/support/alloc.h> + +#include "src/core/lib/client_config/lb_policy_registry.h" +#include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" typedef struct round_robin_lb_policy round_robin_lb_policy; @@ -199,7 +200,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; @@ -226,7 +227,7 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; size_t i; @@ -291,7 +292,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { } } -void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; gpr_mu_lock(&p->mu); if (!p->started_picking) { @@ -300,9 +301,10 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { +static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; @@ -496,30 +498,47 @@ static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, +static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - size_t i; + GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->subchannel_factory != NULL); + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - p->num_subchannels = args->num_subchannels; - p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); - memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - gpr_mu_init(&p->mu); - for (i = 0; i < args->num_subchannels; i++) { - subchannel_data *sd = gpr_malloc(sizeof(*sd)); - memset(sd, 0, sizeof(*sd)); - p->subchannels[i] = sd; - sd->policy = p; - sd->index = i; - sd->subchannel = args->subchannels[i]; - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd); + p->subchannels = + gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + + grpc_subchannel_args sc_args; + size_t subchannel_idx = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); + sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( + exec_ctx, args->subchannel_factory, &sc_args); + + if (subchannel != NULL) { + subchannel_data *sd = gpr_malloc(sizeof(*sd)); + memset(sd, 0, sizeof(*sd)); + p->subchannels[subchannel_idx] = sd; + sd->policy = p; + sd->index = subchannel_idx; + sd->subchannel = subchannel; + ++subchannel_idx; + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); + } } + if (subchannel_idx == 0) { + gpr_free(p->subchannels); + gpr_free(p); + return NULL; + } + p->num_subchannels = subchannel_idx; /* The (dummy node) root of the ready list */ p->ready_list.subchannel = NULL; @@ -527,6 +546,10 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "round_robin"); + gpr_mu_init(&p->mu); return &p->base; } @@ -537,6 +560,15 @@ static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { static grpc_lb_policy_factory round_robin_lb_policy_factory = { &round_robin_factory_vtable}; -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { +static grpc_lb_policy_factory *round_robin_lb_factory_create() { return &round_robin_lb_policy_factory; } + +/* Plugin registration */ + +void grpc_lb_policy_round_robin_init() { + grpc_register_lb_policy(round_robin_lb_factory_create()); + grpc_register_tracer("round_robin", &grpc_lb_round_robin_trace); +} + +void grpc_lb_policy_round_robin_shutdown() {} diff --git a/src/core/ext/transport/chttp2/client/insecure/README.md b/src/core/ext/transport/chttp2/client/insecure/README.md new file mode 100644 index 0000000000..fa11463388 --- /dev/null +++ b/src/core/ext/transport/chttp2/client/insecure/README.md @@ -0,0 +1 @@ +Plugin for creating insecure channels using chttp2 diff --git a/src/core/lib/surface/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index e8777ce816..cf987a02e0 100644 --- a/src/core/lib/surface/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -40,6 +40,7 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/census/grpc_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/client_channel.h" @@ -49,7 +50,6 @@ #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -#include "src/core/lib/transport/chttp2_transport.h" typedef struct { grpc_connector base; diff --git a/src/core/ext/transport/chttp2/client/secure/README.md b/src/core/ext/transport/chttp2/client/secure/README.md new file mode 100644 index 0000000000..405a86e5db --- /dev/null +++ b/src/core/ext/transport/chttp2/client/secure/README.md @@ -0,0 +1 @@ +Plugin for creating secure channels using chttp2 diff --git a/src/core/lib/surface/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index dcb367023e..203475ba52 100644 --- a/src/core/lib/surface/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -40,6 +40,7 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/client_channel.h" #include "src/core/lib/client_config/resolver_registry.h" @@ -49,7 +50,6 @@ #include "src/core/lib/security/security_context.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -#include "src/core/lib/transport/chttp2_transport.h" #include "src/core/lib/tsi/transport_security_interface.h" typedef struct { @@ -267,7 +267,7 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, gpr_log(GPR_ERROR, "Cannot set security context in channel args."); grpc_exec_ctx_finish(&exec_ctx); return grpc_lame_client_channel_create( - target, GRPC_STATUS_INVALID_ARGUMENT, + target, GRPC_STATUS_INTERNAL, "Security connector exists in channel args."); } @@ -276,8 +276,7 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, GRPC_SECURITY_OK) { grpc_exec_ctx_finish(&exec_ctx); return grpc_lame_client_channel_create( - target, GRPC_STATUS_INVALID_ARGUMENT, - "Failed to create security connector."); + target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); } connector_arg = grpc_security_connector_to_arg(&security_connector->base); diff --git a/src/core/ext/transport/chttp2/server/insecure/README.md b/src/core/ext/transport/chttp2/server/insecure/README.md new file mode 100644 index 0000000000..fc0bc14ed7 --- /dev/null +++ b/src/core/ext/transport/chttp2/server/insecure/README.md @@ -0,0 +1 @@ +Plugin for creating insecure servers using chttp2 diff --git a/src/core/lib/surface/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index f0c2ee5153..c1ccfbf639 100644 --- a/src/core/lib/surface/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -36,12 +36,12 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -#include "src/core/lib/transport/chttp2_transport.h" static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, grpc_transport *transport) { diff --git a/src/core/ext/transport/chttp2/server/secure/README.md b/src/core/ext/transport/chttp2/server/secure/README.md new file mode 100644 index 0000000000..6bda696a9a --- /dev/null +++ b/src/core/ext/transport/chttp2/server/secure/README.md @@ -0,0 +1 @@ +Plugin for creating secure servers using chttp2 diff --git a/src/core/lib/security/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 7c9dd221ed..80834f4e88 100644 --- a/src/core/lib/security/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -39,6 +39,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/http_server_filter.h" #include "src/core/lib/iomgr/endpoint.h" @@ -50,7 +51,6 @@ #include "src/core/lib/security/security_context.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -#include "src/core/lib/transport/chttp2_transport.h" typedef struct grpc_server_secure_state { grpc_server *server; diff --git a/src/core/ext/transport/chttp2/transport/README.md b/src/core/ext/transport/chttp2/transport/README.md new file mode 100644 index 0000000000..4684e58759 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/README.md @@ -0,0 +1,4 @@ +chttp2 transport plugin - implements grpc over http2 + +Used by chttp2/{client,server}/{insecure,secure} plugins to implement most of +their functionality diff --git a/src/core/lib/transport/chttp2/alpn.c b/src/core/ext/transport/chttp2/transport/alpn.c index befe319180..c901905d02 100644 --- a/src/core/lib/transport/chttp2/alpn.c +++ b/src/core/ext/transport/chttp2/transport/alpn.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/alpn.h" +#include "src/core/ext/transport/chttp2/transport/alpn.h" #include <grpc/support/log.h> #include <grpc/support/useful.h> diff --git a/src/core/lib/transport/chttp2/alpn.h b/src/core/ext/transport/chttp2/transport/alpn.h index a9184e63a4..94843a1456 100644 --- a/src/core/lib/transport/chttp2/alpn.h +++ b/src/core/ext/transport/chttp2/transport/alpn.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_ALPN_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_ALPN_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H #include <string.h> @@ -46,4 +46,4 @@ size_t grpc_chttp2_num_alpn_versions(void); * grpc_chttp2_num_alpn_versions()) */ const char *grpc_chttp2_get_alpn_version_index(size_t i); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_ALPN_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H */ diff --git a/src/core/lib/transport/chttp2/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c index 79d0aa3d6f..d39f99c271 100644 --- a/src/core/lib/transport/chttp2/bin_encoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c @@ -31,12 +31,12 @@ * */ -#include "src/core/lib/transport/chttp2/bin_encoder.h" +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include <string.h> #include <grpc/support/log.h> -#include "src/core/lib/transport/chttp2/huffsyms.h" +#include "src/core/ext/transport/chttp2/transport/huffsyms.h" static const char alphabet[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; diff --git a/src/core/lib/transport/chttp2/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h index 1c5cd1e1c6..39dae973c9 100644 --- a/src/core/lib/transport/chttp2/bin_encoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_BIN_ENCODER_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_BIN_ENCODER_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H #include <grpc/support/slice.h> @@ -51,4 +51,4 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input); return y; */ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_BIN_ENCODER_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H */ diff --git a/src/core/lib/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 7fed3d8b47..b835e74c56 100644 --- a/src/core/lib/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include <math.h> #include <stdio.h> @@ -43,12 +43,12 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/transport/chttp2/http2_errors.h" -#include "src/core/lib/transport/chttp2/internal.h" -#include "src/core/lib/transport/chttp2/status_conversion.h" -#include "src/core/lib/transport/chttp2/timeout_encoding.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -758,23 +758,35 @@ static void maybe_start_some_streams( } } +#define CLOSURE_BARRIER_STATS_BIT (1 << 0) +#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1) +#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) + static grpc_closure *add_closure_barrier(grpc_closure *closure) { - closure->final_data += 2; + closure->final_data += CLOSURE_BARRIER_FIRST_REF_BIT; return closure; } void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, int success) { grpc_closure *closure = *pclosure; if (closure == NULL) { return; } - closure->final_data -= 2; + closure->final_data -= CLOSURE_BARRIER_FIRST_REF_BIT; if (!success) { - closure->final_data |= 1; + closure->final_data |= CLOSURE_BARRIER_FAILURE_BIT; } - if (closure->final_data < 2) { - grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL); + if (closure->final_data < CLOSURE_BARRIER_FIRST_REF_BIT) { + if (closure->final_data & CLOSURE_BARRIER_STATS_BIT) { + grpc_transport_move_stats(&stream_global->stats, + stream_global->collecting_stats); + stream_global->collecting_stats = NULL; + } + grpc_exec_ctx_enqueue( + exec_ctx, closure, + (closure->final_data & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL); } *pclosure = NULL; } @@ -807,7 +819,13 @@ static void perform_stream_op_locked( } /* use final_data as a barrier until enqueue time; the inital counter is dropped at the end of this function */ - on_complete->final_data = 2; + on_complete->final_data = CLOSURE_BARRIER_FIRST_REF_BIT; + + if (op->collect_stats != NULL) { + GPR_ASSERT(stream_global->collecting_stats == NULL); + stream_global->collecting_stats = op->collect_stats; + on_complete->final_data |= CLOSURE_BARRIER_STATS_BIT; + } if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(exec_ctx, transport_global, stream_global, @@ -840,7 +858,8 @@ static void perform_stream_op_locked( } } else { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 0); + exec_ctx, stream_global, + &stream_global->send_initial_metadata_finished, 0); } } @@ -850,7 +869,7 @@ static void perform_stream_op_locked( stream_global->send_message_finished = add_closure_barrier(on_complete); if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_message_finished, 0); + exec_ctx, stream_global, &stream_global->send_message_finished, 0); } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { @@ -870,7 +889,8 @@ static void perform_stream_op_locked( } if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, + exec_ctx, stream_global, + &stream_global->send_trailing_metadata_finished, grpc_metadata_batch_is_empty(op->send_trailing_metadata)); } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding @@ -909,7 +929,7 @@ static void perform_stream_op_locked( grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1); + grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1); GPR_TIMER_END("perform_stream_op_locked", 0); } @@ -1080,7 +1100,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, &stream_global->received_trailing_metadata, stream_global->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->recv_trailing_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->recv_trailing_metadata_finished, 1); } } } @@ -1131,7 +1152,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create( stream_global->id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status))); + (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), + &stream_global->stats.outgoing)); } grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, NULL); @@ -1179,10 +1201,12 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 0); + exec_ctx, stream_global, &stream_global->send_initial_metadata_finished, + 0); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 0); - grpc_chttp2_complete_closure_step(exec_ctx, + exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished, + 0); + grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &stream_global->send_message_finished, 0); } @@ -1319,7 +1343,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_global->qbuf, - grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR)); + grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, + &stream_global->stats.outgoing)); if (optional_message) { gpr_slice_ref(*optional_message); diff --git a/src/core/lib/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 5008cab7f8..8ebf9fced6 100644 --- a/src/core/lib/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_TRANSPORT_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_TRANSPORT_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/transport/transport.h" @@ -48,4 +48,4 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_transport *transport, gpr_slice *slices, size_t nslices); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_TRANSPORT_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/lib/transport/chttp2/frame.h b/src/core/ext/transport/chttp2/transport/frame.h index 4674bc9703..e1311a1805 100644 --- a/src/core/lib/transport/chttp2/frame.h +++ b/src/core/ext/transport/chttp2/transport/frame.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_H #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> @@ -66,4 +66,4 @@ typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing; #define GRPC_CHTTP2_DATA_FLAG_PADDED 8 #define GRPC_CHTTP2_FLAG_HAS_PRIORITY 0x20 -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_H */ diff --git a/src/core/lib/transport/chttp2/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index cf25c3ccc1..baa66e0008 100644 --- a/src/core/lib/transport/chttp2/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -31,15 +31,15 @@ * */ -#include "src/core/lib/transport/chttp2/frame_data.h" +#include "src/core/ext/transport/chttp2/transport/frame_data.h" #include <string.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/transport/chttp2/internal.h" #include "src/core/lib/transport/transport.h" grpc_chttp2_parse_error grpc_chttp2_data_parser_init( @@ -113,11 +113,13 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf) { gpr_slice hdr; uint8_t *p; + static const size_t header_size = 9; - hdr = gpr_slice_malloc(9); + hdr = gpr_slice_malloc(header_size); p = GPR_SLICE_START_PTR(hdr); GPR_ASSERT(write_bytes < (1 << 24)); *p++ = (uint8_t)(write_bytes >> 16); @@ -132,6 +134,9 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, gpr_slice_buffer_add(outbuf, hdr); gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf); + + stats->framing_bytes += header_size; + stats->data_bytes += write_bytes; } grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( @@ -156,6 +161,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( switch (p->state) { fh_0: case GRPC_CHTTP2_DATA_FH_0: + stream_parsing->stats.incoming.framing_bytes++; p->frame_type = *cur; switch (p->frame_type) { case 0: @@ -174,6 +180,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_1: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size = ((uint32_t)*cur) << 24; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_2; @@ -181,6 +188,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_2: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 16; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_3; @@ -188,6 +196,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_3: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur) << 8; if (++cur == end) { p->state = GRPC_CHTTP2_DATA_FH_4; @@ -195,6 +204,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_4: + stream_parsing->stats.incoming.framing_bytes++; p->frame_size |= ((uint32_t)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; @@ -215,7 +225,9 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); - if ((uint32_t)(end - cur) == p->frame_size) { + uint32_t remaining = (uint32_t)(end - cur); + if (remaining == p->frame_size) { + stream_parsing->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); @@ -224,7 +236,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; - } else if ((uint32_t)(end - cur) > p->frame_size) { + } else if (remaining > p->frame_size) { + stream_parsing->stats.incoming.data_bytes += p->frame_size; grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), @@ -235,11 +248,12 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( cur += p->frame_size; goto fh_0; /* loop */ } else { + GPR_ASSERT(remaining <= p->frame_size); grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - GPR_ASSERT((size_t)(end - cur) <= p->frame_size); - p->frame_size -= (uint32_t)(end - cur); + p->frame_size -= remaining; + stream_parsing->stats.incoming.data_bytes += remaining; return GRPC_CHTTP2_PARSE_OK; } } diff --git a/src/core/lib/transport/chttp2/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index da404a42c6..8a073a9c11 100644 --- a/src/core/lib/transport/chttp2/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -31,16 +31,17 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_DATA_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_DATA_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H /* Parser for GRPC streams embedded in DATA frames */ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/byte_stream.h" -#include "src/core/lib/transport/chttp2/frame.h" +#include "src/core/lib/transport/transport.h" typedef enum { GRPC_CHTTP2_DATA_FH_0, @@ -96,6 +97,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf, uint32_t write_bytes, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_DATA_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/lib/transport/chttp2/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c index bb8c28df90..3697fdef41 100644 --- a/src/core/lib/transport/chttp2/frame_goaway.c +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c @@ -31,8 +31,8 @@ * */ -#include "src/core/lib/transport/chttp2/frame_goaway.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/frame_goaway.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <string.h> diff --git a/src/core/lib/transport/chttp2/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h index f64c44f3d9..e655134434 100644 --- a/src/core/lib/transport/chttp2/frame_goaway.h +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h @@ -31,14 +31,14 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_GOAWAY_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_GOAWAY_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" typedef enum { GRPC_CHTTP2_GOAWAY_LSI0, @@ -74,4 +74,4 @@ void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, gpr_slice debug_data, gpr_slice_buffer *slice_buffer); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_GOAWAY_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_GOAWAY_H */ diff --git a/src/core/lib/transport/chttp2/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 14ca394264..c0192a734d 100644 --- a/src/core/lib/transport/chttp2/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -31,8 +31,8 @@ * */ -#include "src/core/lib/transport/chttp2/frame_ping.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/frame_ping.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <string.h> diff --git a/src/core/lib/transport/chttp2/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h index 7640fc4773..1c1d513c99 100644 --- a/src/core/lib/transport/chttp2/frame_ping.h +++ b/src/core/ext/transport/chttp2/transport/frame_ping.h @@ -31,12 +31,12 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_PING_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_PING_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H #include <grpc/support/slice.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" typedef struct { uint8_t byte; @@ -53,4 +53,4 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_PING_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_PING_H */ diff --git a/src/core/lib/transport/chttp2/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index 060912afc4..d9a04bbf3c 100644 --- a/src/core/lib/transport/chttp2/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -31,15 +31,18 @@ * */ -#include "src/core/lib/transport/chttp2/frame_rst_stream.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <grpc/support/log.h> -#include "src/core/lib/transport/chttp2/frame.h" +#include "src/core/ext/transport/chttp2/transport/frame.h" -gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code) { - gpr_slice slice = gpr_slice_malloc(13); +gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, + grpc_transport_one_way_stats *stats) { + static const size_t frame_size = 13; + gpr_slice slice = gpr_slice_malloc(frame_size); + stats->framing_bytes += frame_size; uint8_t *p = GPR_SLICE_START_PTR(slice); *p++ = 0; @@ -84,6 +87,7 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( cur++; p->byte++; } + stream_parsing->stats.incoming.framing_bytes += (uint64_t)(end - cur); if (p->byte == 4) { GPR_ASSERT(is_last); diff --git a/src/core/lib/transport/chttp2/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 93155fde9d..729274e9a2 100644 --- a/src/core/lib/transport/chttp2/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -31,19 +31,21 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H #include <grpc/support/slice.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" +#include "src/core/lib/transport/transport.h" typedef struct { uint8_t byte; uint8_t reason_bytes[4]; } grpc_chttp2_rst_stream_parser; -gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code); +gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, + grpc_transport_one_way_stats *stats); grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags); @@ -52,4 +54,4 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_RST_STREAM_H */ diff --git a/src/core/lib/transport/chttp2/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 48429c2a78..799d87b87d 100644 --- a/src/core/lib/transport/chttp2/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -31,18 +31,18 @@ * */ -#include "src/core/lib/transport/chttp2/frame_settings.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/frame_settings.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <string.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/frame.h" +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/transport/chttp2/frame.h" -#include "src/core/lib/transport/chttp2/http2_errors.h" -#include "src/core/lib/transport/chttp2_transport.h" #define MAX_MAX_HEADER_LIST_SIZE (1024 * 1024 * 1024) diff --git a/src/core/lib/transport/chttp2/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index 8b294de021..73524addf0 100644 --- a/src/core/lib/transport/chttp2/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -31,13 +31,13 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_SETTINGS_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_SETTINGS_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" typedef enum { GRPC_CHTTP2_SPS_ID0, @@ -100,4 +100,4 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_SETTINGS_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_SETTINGS_H */ diff --git a/src/core/lib/transport/chttp2/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 2ab5003316..0baf6d2fdd 100644 --- a/src/core/lib/transport/chttp2/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -31,14 +31,16 @@ * */ -#include "src/core/lib/transport/chttp2/frame_window_update.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/frame_window_update.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <grpc/support/log.h> -gpr_slice grpc_chttp2_window_update_create(uint32_t id, - uint32_t window_update) { - gpr_slice slice = gpr_slice_malloc(13); +gpr_slice grpc_chttp2_window_update_create( + uint32_t id, uint32_t window_update, grpc_transport_one_way_stats *stats) { + static const size_t frame_size = 13; + gpr_slice slice = gpr_slice_malloc(frame_size); + stats->header_bytes += frame_size; uint8_t *p = GPR_SLICE_START_PTR(slice); GPR_ASSERT(window_update); @@ -87,6 +89,10 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( p->byte++; } + if (stream_parsing != NULL) { + stream_parsing->stats.incoming.framing_bytes += (uint32_t)(end - cur); + } + if (p->byte == 4) { uint32_t received_update = p->amount; if (received_update == 0 || (received_update & 0x80000000u)) { diff --git a/src/core/lib/transport/chttp2/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index 4b1aea294d..a1093a6041 100644 --- a/src/core/lib/transport/chttp2/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -31,12 +31,13 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H #include <grpc/support/slice.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" +#include "src/core/lib/transport/transport.h" typedef struct { uint8_t byte; @@ -44,7 +45,8 @@ typedef struct { uint32_t amount; } grpc_chttp2_window_update_parser; -gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta); +gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta, + grpc_transport_one_way_stats *stats); grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame( grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags); @@ -53,4 +55,4 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_FRAME_WINDOW_UPDATE_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_WINDOW_UPDATE_H */ diff --git a/src/core/lib/transport/chttp2/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 6b45929b04..880305afdd 100644 --- a/src/core/lib/transport/chttp2/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/hpack_encoder.h" +#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include <assert.h> #include <string.h> @@ -45,10 +45,10 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -#include "src/core/lib/transport/chttp2/bin_encoder.h" -#include "src/core/lib/transport/chttp2/hpack_table.h" -#include "src/core/lib/transport/chttp2/timeout_encoding.h" -#include "src/core/lib/transport/chttp2/varint.h" +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" +#include "src/core/ext/transport/chttp2/transport/hpack_table.h" +#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" +#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/transport/static_metadata.h" #define HASH_FRAGMENT_1(x) ((x)&255) @@ -74,6 +74,7 @@ typedef struct { /* output stream id */ uint32_t stream_id; gpr_slice_buffer *output; + grpc_transport_one_way_stats *stats; } framer_state; /* fills p (which is expected to be 9 bytes long) with a data frame header */ @@ -102,6 +103,7 @@ static void finish_frame(framer_state *st, int is_header_boundary, st->stream_id, st->output->length - st->output_length_at_start_of_frame, (uint8_t)((is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0))); + st->stats->framing_bytes += 9; st->is_first_frame = 0; } @@ -147,8 +149,10 @@ static void add_header_data(framer_state *st, gpr_slice slice) { remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + st->output_length_at_start_of_frame - st->output->length; if (len <= remaining) { + st->stats->header_bytes += len; gpr_slice_buffer_add(st->output, slice); } else { + st->stats->header_bytes += remaining; gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining)); finish_frame(st, 0, 0); begin_frame(st); @@ -535,6 +539,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t stream_id, grpc_metadata_batch *metadata, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf) { framer_state st; grpc_linked_mdelem *l; @@ -546,6 +551,7 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, st.stream_id = stream_id; st.output = outbuf; st.is_first_frame = 1; + st.stats = stats; /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/lib/transport/chttp2/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index de46a8f146..91c368fbe2 100644 --- a/src/core/lib/transport/chttp2/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -31,15 +31,16 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_ENCODER_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_ENCODER_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include "src/core/lib/transport/chttp2/frame.h" +#include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" #define GRPC_CHTTP2_HPACKC_NUM_FILTERS 256 #define GRPC_CHTTP2_HPACKC_NUM_VALUES 256 @@ -90,6 +91,7 @@ void grpc_chttp2_hpack_compressor_set_max_usable_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id, grpc_metadata_batch *metadata, int is_eof, + grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_ENCODER_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_ENCODER_H */ diff --git a/src/core/lib/transport/chttp2/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index d41ebab147..259452fa42 100644 --- a/src/core/lib/transport/chttp2/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -31,8 +31,8 @@ * */ -#include "src/core/lib/transport/chttp2/hpack_parser.h" -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <assert.h> #include <stddef.h> @@ -48,9 +48,11 @@ #include <grpc/support/port_platform.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/transport/chttp2/bin_encoder.h" + +extern int grpc_http_trace; typedef enum { NOT_BINARY, @@ -723,7 +725,9 @@ static int finish_indexed_field(grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); if (md == NULL) { - gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index); + } return 0; } GRPC_MDELEM_REF(md); @@ -919,7 +923,9 @@ static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p, /* finish parsing a max table size change */ static int finish_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { - gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index); + if (grpc_http_trace) { + gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index); + } return grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index) && parse_begin(p, cur, end); } @@ -960,7 +966,9 @@ static int parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur, static int parse_illegal_op(grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { GPR_ASSERT(cur != end); - gpr_log(GPR_DEBUG, "Illegal hpack op code %d", *cur); + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "Illegal hpack op code %d", *cur); + } return parse_error(p, cur, end); } @@ -1069,10 +1077,12 @@ static int parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur, } error: - gpr_log(GPR_ERROR, - "integer overflow in hpack integer decoding: have 0x%08x, " - "got byte 0x%02x on byte 5", - *p->parsing.value, *cur); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, + "integer overflow in hpack integer decoding: have 0x%08x, " + "got byte 0x%02x on byte 5", + *p->parsing.value, *cur); + } return parse_error(p, cur, end); } @@ -1094,10 +1104,12 @@ static int parse_value5up(grpc_chttp2_hpack_parser *p, const uint8_t *cur, return parse_next(p, cur + 1, end); } - gpr_log(GPR_ERROR, - "integer overflow in hpack integer decoding: have 0x%08x, " - "got byte 0x%02x sometime after byte 5", - *p->parsing.value, *cur); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, + "integer overflow in hpack integer decoding: have 0x%08x, " + "got byte 0x%02x sometime after byte 5", + *p->parsing.value, *cur); + } return parse_error(p, cur, end); } @@ -1329,7 +1341,9 @@ static is_binary_header is_binary_literal_header(grpc_chttp2_hpack_parser *p) { static is_binary_header is_binary_indexed_header(grpc_chttp2_hpack_parser *p) { grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index); if (!elem) { - gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index); + } return ERROR_HEADER; } return grpc_is_binary_header( @@ -1412,6 +1426,9 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0); + if (stream_parsing != NULL) { + stream_parsing->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice); + } if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice))) { GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); diff --git a/src/core/lib/transport/chttp2/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h index a534fd5cf4..0aaddc8b9c 100644 --- a/src/core/lib/transport/chttp2/hpack_parser.h +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h @@ -31,15 +31,15 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_PARSER_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_PARSER_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H #include <stddef.h> #include <grpc/support/port_platform.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" +#include "src/core/ext/transport/chttp2/transport/hpack_table.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/transport/chttp2/frame.h" -#include "src/core/lib/transport/chttp2/hpack_table.h" #include "src/core/lib/transport/metadata.h" typedef struct grpc_chttp2_hpack_parser grpc_chttp2_hpack_parser; @@ -113,4 +113,4 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_PARSER_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_PARSER_H */ diff --git a/src/core/lib/transport/chttp2/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c index f92bc26585..67cd1bb10a 100644 --- a/src/core/lib/transport/chttp2/hpack_table.c +++ b/src/core/ext/transport/chttp2/transport/hpack_table.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/hpack_table.h" +#include "src/core/ext/transport/chttp2/transport/hpack_table.h" #include <assert.h> #include <string.h> @@ -41,6 +41,8 @@ #include "src/core/lib/support/murmur_hash.h" +extern int grpc_http_trace; + static struct { const char *key; const char *value; @@ -264,12 +266,16 @@ int grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl, return 1; } if (bytes > tbl->max_bytes) { - gpr_log(GPR_ERROR, - "Attempt to make hpack table %d bytes when max is %d bytes", bytes, - tbl->max_bytes); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, + "Attempt to make hpack table %d bytes when max is %d bytes", + bytes, tbl->max_bytes); + } return 0; } - gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes); + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes); + } while (tbl->mem_used > bytes) { evict1(tbl); } @@ -293,10 +299,12 @@ int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; if (tbl->current_table_bytes > tbl->max_bytes) { - gpr_log(GPR_ERROR, - "HPACK max table size reduced to %d but not reflected by hpack " - "stream (still at %d)", - tbl->max_bytes, tbl->current_table_bytes); + if (grpc_http_trace) { + gpr_log(GPR_ERROR, + "HPACK max table size reduced to %d but not reflected by hpack " + "stream (still at %d)", + tbl->max_bytes, tbl->current_table_bytes); + } return 0; } diff --git a/src/core/lib/transport/chttp2/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h index 2cbc02dd9c..b3475c8f5c 100644 --- a/src/core/lib/transport/chttp2/hpack_table.h +++ b/src/core/ext/transport/chttp2/transport/hpack_table.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_TABLE_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_TABLE_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> @@ -105,4 +105,4 @@ typedef struct { grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find( const grpc_chttp2_hptbl *tbl, grpc_mdelem *md); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_HPACK_TABLE_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H */ diff --git a/src/core/lib/transport/chttp2/hpack_tables.txt b/src/core/ext/transport/chttp2/transport/hpack_tables.txt index 08842a0267..08842a0267 100644 --- a/src/core/lib/transport/chttp2/hpack_tables.txt +++ b/src/core/ext/transport/chttp2/transport/hpack_tables.txt diff --git a/src/core/lib/transport/chttp2/http2_errors.h b/src/core/ext/transport/chttp2/transport/http2_errors.h index 0238f9d80b..85542e2337 100644 --- a/src/core/lib/transport/chttp2/http2_errors.h +++ b/src/core/ext/transport/chttp2/transport/http2_errors.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_HTTP2_ERRORS_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_HTTP2_ERRORS_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H /* error codes for RST_STREAM from http2 draft 14 section 7 */ typedef enum { @@ -53,4 +53,4 @@ typedef enum { GRPC_CHTTP2__ERROR_DO_NOT_USE = -1 } grpc_chttp2_error_code; -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_HTTP2_ERRORS_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H */ diff --git a/src/core/lib/transport/chttp2/huffsyms.c b/src/core/ext/transport/chttp2/transport/huffsyms.c index 27497e6ae0..91f62bf34b 100644 --- a/src/core/lib/transport/chttp2/huffsyms.c +++ b/src/core/ext/transport/chttp2/transport/huffsyms.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/huffsyms.h" +#include "src/core/ext/transport/chttp2/transport/huffsyms.h" /* Constants pulled from the HPACK spec, and converted to C using the vim command: diff --git a/src/core/lib/transport/chttp2/huffsyms.h b/src/core/ext/transport/chttp2/transport/huffsyms.h index 1ca77b9207..780baeaf55 100644 --- a/src/core/lib/transport/chttp2/huffsyms.h +++ b/src/core/ext/transport/chttp2/transport/huffsyms.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_HUFFSYMS_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_HUFFSYMS_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HUFFSYMS_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HUFFSYMS_H /* HPACK static huffman table */ @@ -45,4 +45,4 @@ typedef struct { extern const grpc_chttp2_huffsym grpc_chttp2_huffsyms[GRPC_CHTTP2_NUM_HUFFSYMS]; -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_HUFFSYMS_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HUFFSYMS_H */ diff --git a/src/core/lib/transport/chttp2/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index a1a8d37562..ef5fd4fe03 100644 --- a/src/core/lib/transport/chttp2/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -31,11 +31,11 @@ * */ -#include "src/core/lib/transport/chttp2/incoming_metadata.h" +#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include <string.h> -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> diff --git a/src/core/lib/transport/chttp2/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index edfa0adf9d..5e1dc72389 100644 --- a/src/core/lib/transport/chttp2/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_INCOMING_METADATA_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_INCOMING_METADATA_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INCOMING_METADATA_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INCOMING_METADATA_H #include "src/core/lib/transport/transport.h" @@ -57,4 +57,4 @@ void grpc_chttp2_incoming_metadata_buffer_add( void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_INCOMING_METADATA_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INCOMING_METADATA_H */ diff --git a/src/core/lib/transport/chttp2/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 346e404204..2c3c7a3820 100644 --- a/src/core/lib/transport/chttp2/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -31,24 +31,24 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_INTERNAL_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_INTERNAL_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H #include <assert.h> #include <stdbool.h> +#include "src/core/ext/transport/chttp2/transport/frame.h" +#include "src/core/ext/transport/chttp2/transport/frame_data.h" +#include "src/core/ext/transport/chttp2/transport/frame_goaway.h" +#include "src/core/ext/transport/chttp2/transport/frame_ping.h" +#include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h" +#include "src/core/ext/transport/chttp2/transport/frame_settings.h" +#include "src/core/ext/transport/chttp2/transport/frame_window_update.h" +#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" +#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/transport/chttp2/frame.h" -#include "src/core/lib/transport/chttp2/frame_data.h" -#include "src/core/lib/transport/chttp2/frame_goaway.h" -#include "src/core/lib/transport/chttp2/frame_ping.h" -#include "src/core/lib/transport/chttp2/frame_rst_stream.h" -#include "src/core/lib/transport/chttp2/frame_settings.h" -#include "src/core/lib/transport/chttp2/frame_window_update.h" -#include "src/core/lib/transport/chttp2/hpack_encoder.h" -#include "src/core/lib/transport/chttp2/hpack_parser.h" -#include "src/core/lib/transport/chttp2/incoming_metadata.h" -#include "src/core/lib/transport/chttp2/stream_map.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" @@ -394,6 +394,9 @@ typedef struct { grpc_metadata_batch *recv_trailing_metadata; grpc_closure *recv_trailing_metadata_finished; + grpc_transport_stream_stats *collecting_stats; + grpc_transport_stream_stats stats; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ @@ -435,6 +438,8 @@ typedef struct { gpr_slice fetching_slice; size_t stream_fetched; grpc_closure finished_fetch; + /** stats gathered during the write */ + grpc_transport_one_way_stats stats; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -460,6 +465,8 @@ struct grpc_chttp2_stream_parsing { int64_t outgoing_window; /** number of bytes received - reset at end of parse thread execution */ int64_t received_bytes; + /** stats gathered during the parse */ + grpc_transport_stream_stats stats; /** incoming metadata */ grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; @@ -635,6 +642,7 @@ void grpc_chttp2_parsing_become_skip_parser( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, int success); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -777,4 +785,4 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_INTERNAL_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/lib/transport/chttp2/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 9ee52f63f2..1e815e096b 100644 --- a/src/core/lib/transport/chttp2/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <string.h> @@ -39,10 +39,10 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" +#include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/transport/chttp2/http2_errors.h" -#include "src/core/lib/transport/chttp2/status_conversion.h" -#include "src/core/lib/transport/chttp2/timeout_encoding.h" #include "src/core/lib/transport/static_metadata.h" static int init_frame_parser(grpc_exec_ctx *exec_ctx, @@ -171,6 +171,9 @@ void grpc_chttp2_publish_reads( grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } + /* flush stats to global stream state */ + grpc_transport_move_stats(&stream_parsing->stats, &stream_global->stats); + /* update outgoing flow control window */ was_zero = stream_global->outgoing_window <= 0; GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global, @@ -544,8 +547,13 @@ static int init_data_frame_parser( grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; - if (!stream_parsing || stream_parsing->received_close) + if (stream_parsing == NULL) { + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } + stream_parsing->stats.incoming.framing_bytes += 9; + if (stream_parsing->received_close) { return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } if (err == GRPC_CHTTP2_PARSE_OK) { err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing); } @@ -566,7 +574,8 @@ static int init_data_frame_parser( gpr_slice_buffer_add( &transport_parsing->qbuf, grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + GRPC_CHTTP2_PROTOCOL_ERROR, + &stream_parsing->stats.outgoing)); return init_skip_frame_parser(exec_ctx, transport_parsing, 0); case GRPC_CHTTP2_CONNECTION_ERROR: return 0; @@ -717,6 +726,7 @@ static int init_header_frame_parser( transport_parsing->incoming_stream = stream_parsing; } GPR_ASSERT(stream_parsing != NULL && (via_accept == 0 || via_accept == 1)); + stream_parsing->stats.incoming.framing_bytes += 9; if (stream_parsing->received_close) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); transport_parsing->incoming_stream = NULL; @@ -752,9 +762,14 @@ static int init_window_update_frame_parser( &transport_parsing->simple.window_update, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); - if (transport_parsing->incoming_stream_id) { - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); + if (transport_parsing->incoming_stream_id != 0) { + grpc_chttp2_stream_parsing *stream_parsing = + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); + if (stream_parsing == NULL) { + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); + } + stream_parsing->stats.incoming.framing_bytes += 9; } transport_parsing->parser = grpc_chttp2_window_update_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.window_update; @@ -778,11 +793,13 @@ static int init_rst_stream_parser( &transport_parsing->simple.rst_stream, transport_parsing->incoming_frame_size, transport_parsing->incoming_frame_flags); - transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( - transport_parsing, transport_parsing->incoming_stream_id); + grpc_chttp2_stream_parsing *stream_parsing = + transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( + transport_parsing, transport_parsing->incoming_stream_id); if (!transport_parsing->incoming_stream) { return init_skip_frame_parser(exec_ctx, transport_parsing, 0); } + stream_parsing->stats.incoming.framing_bytes += 9; transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.rst_stream; return ok; @@ -856,7 +873,8 @@ static int parse_frame_slice(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_parsing->qbuf, grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, - GRPC_CHTTP2_PROTOCOL_ERROR)); + GRPC_CHTTP2_PROTOCOL_ERROR, + &stream_parsing->stats.outgoing)); } return 1; case GRPC_CHTTP2_CONNECTION_ERROR: diff --git a/src/core/lib/transport/chttp2/status_conversion.c b/src/core/ext/transport/chttp2/transport/status_conversion.c index 73dd63e720..5a79579989 100644 --- a/src/core/lib/transport/chttp2/status_conversion.c +++ b/src/core/ext/transport/chttp2/transport/status_conversion.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/status_conversion.h" +#include "src/core/ext/transport/chttp2/transport/status_conversion.h" int grpc_chttp2_grpc_status_to_http2_error(grpc_status_code status) { switch (status) { diff --git a/src/core/lib/transport/chttp2/status_conversion.h b/src/core/ext/transport/chttp2/transport/status_conversion.h index 241417d32e..e92a5f6702 100644 --- a/src/core/lib/transport/chttp2/status_conversion.h +++ b/src/core/ext/transport/chttp2/transport/status_conversion.h @@ -31,11 +31,11 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_STATUS_CONVERSION_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_STATUS_CONVERSION_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H #include <grpc/grpc.h> -#include "src/core/lib/transport/chttp2/http2_errors.h" +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" /* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */ grpc_chttp2_error_code grpc_chttp2_grpc_status_to_http2_error( @@ -47,4 +47,4 @@ grpc_status_code grpc_chttp2_http2_error_to_grpc_status( grpc_status_code grpc_chttp2_http2_status_to_grpc_status(int status); int grpc_chttp2_grpc_status_to_http2_status(grpc_status_code status); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_STATUS_CONVERSION_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H */ diff --git a/src/core/lib/transport/chttp2/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index b51a041dc7..01ed49b1ec 100644 --- a/src/core/lib/transport/chttp2/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <grpc/support/log.h> diff --git a/src/core/lib/transport/chttp2/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index dbbbe783bf..3fb1389650 100644 --- a/src/core/lib/transport/chttp2/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/stream_map.h" +#include "src/core/ext/transport/chttp2/transport/stream_map.h" #include <string.h> diff --git a/src/core/lib/transport/chttp2/stream_map.h b/src/core/ext/transport/chttp2/transport/stream_map.h index 1c56b18e54..4e8586fe46 100644 --- a/src/core/lib/transport/chttp2/stream_map.h +++ b/src/core/ext/transport/chttp2/transport/stream_map.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_STREAM_MAP_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_STREAM_MAP_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H #include <grpc/support/port_platform.h> @@ -81,4 +81,4 @@ void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map *map, void *value), void *user_data); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_STREAM_MAP_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H */ diff --git a/src/core/lib/transport/chttp2/timeout_encoding.c b/src/core/ext/transport/chttp2/transport/timeout_encoding.c index 0edacaafd3..d5b9da9252 100644 --- a/src/core/lib/transport/chttp2/timeout_encoding.c +++ b/src/core/ext/transport/chttp2/transport/timeout_encoding.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/timeout_encoding.h" +#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include <stdio.h> #include <string.h> diff --git a/src/core/lib/transport/chttp2/timeout_encoding.h b/src/core/ext/transport/chttp2/transport/timeout_encoding.h index 731beb5a37..dc64f9cc3f 100644 --- a/src/core/lib/transport/chttp2/timeout_encoding.h +++ b/src/core/ext/transport/chttp2/transport/timeout_encoding.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_TIMEOUT_ENCODING_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_TIMEOUT_ENCODING_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_TIMEOUT_ENCODING_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_TIMEOUT_ENCODING_H #include <grpc/support/time.h> #include "src/core/lib/support/string.h" @@ -44,4 +44,4 @@ void grpc_chttp2_encode_timeout(gpr_timespec timeout, char *buffer); int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout); -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_TIMEOUT_ENCODING_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_TIMEOUT_ENCODING_H */ diff --git a/src/core/lib/transport/chttp2/varint.c b/src/core/ext/transport/chttp2/transport/varint.c index 6dfef45362..6721d042a2 100644 --- a/src/core/lib/transport/chttp2/varint.c +++ b/src/core/ext/transport/chttp2/transport/varint.c @@ -31,7 +31,7 @@ * */ -#include "src/core/lib/transport/chttp2/varint.h" +#include "src/core/ext/transport/chttp2/transport/varint.h" uint32_t grpc_chttp2_hpack_varint_length(uint32_t tail_value) { if (tail_value < (1 << 7)) { diff --git a/src/core/lib/transport/chttp2/varint.h b/src/core/ext/transport/chttp2/transport/varint.h index e4a0ae3c22..6442ea3c5a 100644 --- a/src/core/lib/transport/chttp2/varint.h +++ b/src/core/ext/transport/chttp2/transport/varint.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_LIB_TRANSPORT_CHTTP2_VARINT_H -#define GRPC_CORE_LIB_TRANSPORT_CHTTP2_VARINT_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_VARINT_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_VARINT_H #include <grpc/support/port_platform.h> @@ -72,4 +72,4 @@ void grpc_chttp2_hpack_write_varint_tail(uint32_t tail_value, uint8_t* target, } \ } while (0) -#endif /* GRPC_CORE_LIB_TRANSPORT_CHTTP2_VARINT_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_VARINT_H */ diff --git a/src/core/lib/transport/chttp2/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index daea331d31..6bb188f1da 100644 --- a/src/core/lib/transport/chttp2/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -31,14 +31,14 @@ * */ -#include "src/core/lib/transport/chttp2/internal.h" +#include "src/core/ext/transport/chttp2/transport/internal.h" #include <limits.h> #include <grpc/support/log.h> +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/transport/chttp2/http2_errors.h" static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); @@ -161,8 +161,10 @@ int grpc_chttp2_unlocking_check_writes( transport_global->announce_incoming_window, UINT32_MAX); GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, announce_incoming_window, announced); - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_window_update_create(0, announced)); + grpc_transport_one_way_stats throwaway_stats; + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create(0, announced, &throwaway_stats)); } GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); @@ -205,7 +207,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, if (stream_writing->send_initial_metadata != NULL) { grpc_chttp2_encode_header( &transport_writing->hpack_compressor, stream_writing->id, - stream_writing->send_initial_metadata, 0, &transport_writing->outbuf); + stream_writing->send_initial_metadata, 0, &stream_writing->stats, + &transport_writing->outbuf); stream_writing->send_initial_metadata = NULL; stream_writing->sent_initial_metadata = 1; } @@ -216,7 +219,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_add( &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_writing->id, - stream_writing->announce_window)); + stream_writing->announce_window, + &stream_writing->stats)); GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, announce_window, announce); stream_writing->announce_window = 0; @@ -255,7 +259,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->send_trailing_metadata); grpc_chttp2_encode_data( stream_writing->id, &stream_writing->flow_controlled_buffer, - send_bytes, is_last_frame, &transport_writing->outbuf); + send_bytes, is_last_frame, &stream_writing->stats, + &transport_writing->outbuf); GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, outgoing_window, send_bytes); @@ -281,19 +286,20 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, stream_writing->send_trailing_metadata != NULL) { if (grpc_metadata_batch_is_empty( stream_writing->send_trailing_metadata)) { - grpc_chttp2_encode_data(stream_writing->id, - &stream_writing->flow_controlled_buffer, 0, 1, - &transport_writing->outbuf); + grpc_chttp2_encode_data( + stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1, + &stream_writing->stats, &transport_writing->outbuf); } else { - grpc_chttp2_encode_header(&transport_writing->hpack_compressor, - stream_writing->id, - stream_writing->send_trailing_metadata, 1, - &transport_writing->outbuf); + grpc_chttp2_encode_header( + &transport_writing->hpack_compressor, stream_writing->id, + stream_writing->send_trailing_metadata, 1, &stream_writing->stats, + &transport_writing->outbuf); } if (!transport_writing->is_client && !stream_writing->read_closed) { gpr_slice_buffer_add(&transport_writing->outbuf, grpc_chttp2_rst_stream_create( - stream_writing->id, GRPC_CHTTP2_NO_ERROR)); + stream_writing->id, GRPC_CHTTP2_NO_ERROR, + &stream_writing->stats)); } stream_writing->send_trailing_metadata = NULL; stream_writing->sent_trailing_metadata = 1; @@ -328,17 +334,21 @@ void grpc_chttp2_cleanup_writing( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_initial_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->send_initial_metadata_finished, 1); } + grpc_transport_move_one_way_stats(&stream_writing->stats, + &stream_global->stats.outgoing); if (stream_writing->sent_message) { GPR_ASSERT(stream_writing->send_message == NULL); grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_message_finished, 1); + exec_ctx, stream_global, &stream_global->send_message_finished, 1); stream_writing->sent_message = 0; } if (stream_writing->sent_trailing_metadata) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 1); + exec_ctx, stream_global, + &stream_global->send_trailing_metadata_finished, 1); } if (stream_writing->sent_trailing_metadata) { grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 67d287ec6b..1ea202c543 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -37,23 +37,23 @@ #include <grpc/compression.h> #include <grpc/grpc.h> -/* Copy some arguments */ +/** Copy the arguments in \a src into a new instance */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); -/* Copy some arguments, stably sorting keys */ -grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a); +/** Copy the arguments in \a src into a new instance, stably sorting keys */ +grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *src); -/** Copy some arguments and add the to_add parameter in the end. - If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ +/** Copy the arguments in \a src and append \a to_add. If \a to_add is NULL, it + * is equivalent to calling \a grpc_channel_args_copy. */ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add); -/** Copy args from a then args from b into a new channel args */ +/** Concatenate args from \a a and \a b into a new instance */ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); -/** Destroy arguments created by grpc_channel_args_copy */ +/** Destroy arguments created by \a grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); /** Reads census_enabled settings from channel args. Returns 1 if census_enabled diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 7dbac38414..b4e58b7f29 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -112,7 +112,9 @@ static void hc_mutate_op(grpc_call_element *elem, /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, - GRPC_MDELEM_METHOD_POST); + op->send_idempotent_request + ? GRPC_MDELEM_METHOD_PUT + : GRPC_MDELEM_METHOD_POST); grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, channeld->static_scheme); grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index df99b77ab3..db1a3d5010 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -41,7 +41,7 @@ typedef struct call_data { uint8_t seen_path; - uint8_t seen_post; + uint8_t seen_method; uint8_t sent_status; uint8_t seen_scheme; uint8_t seen_te_trailers; @@ -50,6 +50,7 @@ typedef struct call_data { grpc_linked_mdelem content_type; grpc_metadata_batch *recv_initial_metadata; + bool *recv_idempotent_request; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -72,11 +73,16 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* Check if it is one of the headers we care about. */ if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST || - md == GRPC_MDELEM_SCHEME_HTTP || md == GRPC_MDELEM_SCHEME_HTTPS || + md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP || + md == GRPC_MDELEM_SCHEME_HTTPS || md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { /* swallow it */ if (md == GRPC_MDELEM_METHOD_POST) { - calld->seen_post = 1; + calld->seen_method = 1; + *calld->recv_idempotent_request = false; + } else if (md == GRPC_MDELEM_METHOD_PUT) { + calld->seen_method = 1; + *calld->recv_idempotent_request = true; } else if (md->key == GRPC_MDSTR_SCHEME) { calld->seen_scheme = 1; } else if (md == GRPC_MDELEM_TE_TRAILERS) { @@ -142,7 +148,7 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { /* Have we seen the required http2 transport headers? (:method, :scheme, content-type, with :path and :authority covered at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + if (calld->seen_method && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority) { /* do nothing */ } else { @@ -152,7 +158,7 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { if (!calld->seen_authority) { gpr_log(GPR_ERROR, "Missing :authority header"); } - if (!calld->seen_post) { + if (!calld->seen_method) { gpr_log(GPR_ERROR, "Missing :method header"); } if (!calld->seen_scheme) { @@ -185,7 +191,9 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ + GPR_ASSERT(op->recv_idempotent_request != NULL); calld->recv_initial_metadata = op->recv_initial_metadata; + calld->recv_idempotent_request = op->recv_idempotent_request; calld->on_done_recv = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->hs_on_recv; } diff --git a/src/core/lib/client_config/lb_policy_factory.c b/src/core/lib/client_config/lb_policy_factory.c index 2ca6f42f89..ede1d624af 100644 --- a/src/core/lib/client_config/lb_policy_factory.c +++ b/src/core/lib/client_config/lb_policy_factory.c @@ -42,7 +42,8 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) { } grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { + grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory, + grpc_lb_policy_args* args) { if (factory == NULL) return NULL; - return factory->vtable->create_lb_policy(factory, args); + return factory->vtable->create_lb_policy(exec_ctx, factory, args); } diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/lib/client_config/lb_policy_factory.h index 36eaf178d9..9a93a8ca3f 100644 --- a/src/core/lib/client_config/lb_policy_factory.h +++ b/src/core/lib/client_config/lb_policy_factory.h @@ -35,7 +35,10 @@ #define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H #include "src/core/lib/client_config/lb_policy.h" -#include "src/core/lib/client_config/subchannel.h" +#include "src/core/lib/client_config/subchannel_factory.h" +#include "src/core/lib/iomgr/resolve_address.h" + +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -47,8 +50,8 @@ struct grpc_lb_policy_factory { }; typedef struct grpc_lb_policy_args { - grpc_subchannel **subchannels; - size_t num_subchannels; + grpc_resolved_addresses *addresses; + grpc_subchannel_factory *subchannel_factory; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { @@ -56,7 +59,8 @@ struct grpc_lb_policy_factory_vtable { void (*unref)(grpc_lb_policy_factory *factory); /** Implementation of grpc_lb_policy_factory_create_lb_policy */ - grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy *(*create_lb_policy)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); /** Name for the LB policy this factory implements */ @@ -68,6 +72,7 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); /** Create a lb_policy instance. */ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); + grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); #endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/lib/client_config/lb_policy_registry.c index 13acfe78cd..d1dc502b9a 100644 --- a/src/core/lib/client_config/lb_policy_registry.c +++ b/src/core/lib/client_config/lb_policy_registry.c @@ -40,12 +40,7 @@ static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; static int g_number_of_lb_policies = 0; -static grpc_lb_policy_factory *g_default_lb_policy_factory; - -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { - g_number_of_lb_policies = 0; - g_default_lb_policy_factory = default_factory; -} +void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; } void grpc_lb_policy_registry_shutdown(void) { int i; @@ -79,10 +74,10 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) { return NULL; } -grpc_lb_policy *grpc_lb_policy_create(const char *name, +grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name, grpc_lb_policy_args *args) { grpc_lb_policy_factory *factory = lookup_factory(name); grpc_lb_policy *lb_policy = - grpc_lb_policy_factory_create_lb_policy(factory, args); + grpc_lb_policy_factory_create_lb_policy(exec_ctx, factory, args); return lb_policy; } diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/lib/client_config/lb_policy_registry.h index c251fd9f08..1ecf7fe39f 100644 --- a/src/core/lib/client_config/lb_policy_registry.h +++ b/src/core/lib/client_config/lb_policy_registry.h @@ -35,10 +35,11 @@ #define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H #include "src/core/lib/client_config/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" /** Initialize the registry and set \a default_factory as the factory to be * returned when no name is provided in a lookup */ -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_init(void); void grpc_lb_policy_registry_shutdown(void); /** Register a LB policy factory. */ @@ -48,7 +49,7 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory); * * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init * will be returned. */ -grpc_lb_policy *grpc_lb_policy_create(const char *name, +grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name, grpc_lb_policy_args *args); #endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/lib/client_config/resolvers/dns_resolver.c b/src/core/lib/client_config/resolvers/dns_resolver.c index ab445730ad..62bccdd045 100644 --- a/src/core/lib/client_config/resolvers/dns_resolver.c +++ b/src/core/lib/client_config/resolvers/dns_resolver.c @@ -162,38 +162,23 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = 0; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - size_t naddrs = 0; - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = (size_t)addresses->addrs[i].len; - grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - if (subchannel != NULL) { - subchannels[naddrs++] = subchannel; - } - } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c index 66cddc3ed9..c787bd57d6 100644 --- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c @@ -58,11 +58,7 @@ typedef struct { char *lb_policy_name; /** the addresses that we've 'resolved' */ - struct sockaddr_storage *addrs; - /** the corresponding length of the addresses */ - size_t *addrs_len; - /** how many elements in \a addrs */ - size_t num_addrs; + grpc_resolved_addresses *addresses; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -125,28 +121,14 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { - grpc_client_config *cfg; - grpc_lb_policy *lb_policy; - grpc_lb_policy_args lb_policy_args; - grpc_subchannel **subchannels; - grpc_subchannel_args args; - if (r->next_completion != NULL && !r->published) { - size_t i; - cfg = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); - for (i = 0; i < r->num_addrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addrs[i]; - args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - } + grpc_client_config *cfg = grpc_client_config_create(); + grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = r->num_addrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - gpr_free(subchannels); + lb_policy_args.addresses = r->addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + grpc_lb_policy *lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; @@ -160,8 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r->lb_policy_name); gpr_free(r); } @@ -269,7 +250,6 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - size_t i; int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; gpr_slice path_slice; @@ -309,15 +289,18 @@ static grpc_resolver *sockaddr_create( gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); - r->num_addrs = path_parts.count; - r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); + r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->addresses->naddrs = path_parts.count; + r->addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); - for (i = 0; i < r->num_addrs; i++) { + for (size_t i = 0; i < r->addresses->naddrs; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + if (!parse(&ith_uri, + (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), + &r->addresses->addrs[i].len)) { errors_found = 1; /* GPR_TRUE */ } gpr_free(part_str); @@ -328,8 +311,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_unref(path_slice); if (errors_found) { gpr_free(r->lb_policy_name); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r); return NULL; } diff --git a/src/core/lib/client_config/resolvers/zookeeper_resolver.c b/src/core/lib/client_config/resolvers/zookeeper_resolver.c index 3bb0bbdf5c..404dfcd423 100644 --- a/src/core/lib/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/lib/client_config/resolvers/zookeeper_resolver.c @@ -184,28 +184,22 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); + + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving == 1); diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 5d4e304615..2782ad758e 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -39,6 +39,8 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +extern int grpc_http_trace; + static char *buf2str(void *buffer, size_t length) { char *out = gpr_malloc(length + 1); memcpy(out, buffer, length); @@ -72,7 +74,7 @@ static int handle_response_line(grpc_http_parser *parser) { return 1; error: - gpr_log(GPR_ERROR, "Failed parsing response line"); + if (grpc_http_trace) gpr_log(GPR_ERROR, "Failed parsing response line"); return 0; } @@ -125,7 +127,7 @@ static int handle_request_line(grpc_http_parser *parser) { return 1; error: - gpr_log(GPR_ERROR, "Failed parsing request line"); + if (grpc_http_trace) gpr_log(GPR_ERROR, "Failed parsing request line"); return 0; } @@ -150,7 +152,8 @@ static int add_header(grpc_http_parser *parser) { GPR_ASSERT(cur != end); if (*cur == ' ' || *cur == '\t') { - gpr_log(GPR_ERROR, "Continued header lines not supported yet"); + if (grpc_http_trace) + gpr_log(GPR_ERROR, "Continued header lines not supported yet"); goto error; } @@ -158,7 +161,7 @@ static int add_header(grpc_http_parser *parser) { cur++; } if (cur == end) { - gpr_log(GPR_ERROR, "Didn't find ':' in header string"); + if (grpc_http_trace) gpr_log(GPR_ERROR, "Didn't find ':' in header string"); goto error; } GPR_ASSERT(cur >= beg); @@ -249,8 +252,9 @@ static int addbyte(grpc_http_parser *parser, uint8_t byte) { case GRPC_HTTP_FIRST_LINE: case GRPC_HTTP_HEADERS: if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) { - gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", - GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); + if (grpc_http_trace) + gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", + GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); return 0; } parser->cur_line[parser->cur_line_length] = byte; diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c index 0b14e446ae..720fc331ed 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.c @@ -37,7 +37,7 @@ #include "src/core/lib/iomgr/pollset_set_windows.h" -grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } +grpc_pollset_set* grpc_pollset_set_create(void) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index 9dbc2784e4..b433aee7fa 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -89,6 +89,28 @@ int grpc_set_socket_no_sigpipe_if_possible(int fd) { #endif } +int grpc_set_socket_ip_pktinfo_if_possible(int fd) { +#ifdef GPR_HAVE_IP_PKTINFO + int get_local_ip = 1; + return 0 == setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, + sizeof(get_local_ip)); +#else + (void)fd; + return 1; +#endif +} + +int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { +#ifdef GPR_HAVE_IPV6_RECVPKTINFO + int get_local_ip = 1; + return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, + sizeof(get_local_ip)); +#else + (void)fd; + return 1; +#endif +} + /* set a socket to close on exec */ int grpc_set_socket_cloexec(int fd, int close_on_exec) { int oldflags = fcntl(fd, F_GETFD, 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 063f298d72..f73ad6317d 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -68,6 +68,16 @@ int grpc_ipv6_loopback_available(void); If SO_NO_SIGPIPE is not available, returns 1. */ int grpc_set_socket_no_sigpipe_if_possible(int fd); +/* Tries to set IP_PKTINFO if available on this platform. + Returns 1 on success, 0 on failure. + If IP_PKTINFO is not available, returns 1. */ +int grpc_set_socket_ip_pktinfo_if_possible(int fd); + +/* Tries to set IPV6_RECVPKTINFO if available on this platform. + Returns 1 on success, 0 on failure. + If IPV6_RECVPKTINFO is not available, returns 1. */ +int grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd); + /* An enum to keep track of IPv4/IPv6 socket modes. Currently, this information is only used when a socket is first created, but diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 3d8bcc9c81..9068109c3a 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -208,8 +208,6 @@ static int prepare_socket(int fd, const struct sockaddr *addr, size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; - int get_local_ip; - int rc; if (fd < 0) { goto error; @@ -220,14 +218,9 @@ static int prepare_socket(int fd, const struct sockaddr *addr, strerror(errno)); } - get_local_ip = 1; - rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, - sizeof(get_local_ip)); - if (rc == 0 && addr->sa_family == AF_INET6) { -#if !defined(__APPLE__) - rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, - sizeof(get_local_ip)); -#endif + if (grpc_set_socket_ip_pktinfo_if_possible(fd) && + addr->sa_family == AF_INET6) { + grpc_set_socket_ipv6_recvpktinfo_if_possible(fd); } GPR_ASSERT(addr_len < ~(socklen_t)0); diff --git a/src/core/lib/json/json_reader.c b/src/core/lib/json/json_reader.c index 0807f029ce..4cff13dff1 100644 --- a/src/core/lib/json/json_reader.c +++ b/src/core/lib/json/json_reader.c @@ -280,13 +280,14 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) { break; case GRPC_JSON_STATE_OBJECT_KEY_STRING: - GPR_ASSERT(reader->unicode_high_surrogate == 0); + if (reader->unicode_high_surrogate != 0) + return GRPC_JSON_PARSE_ERROR; if (c == '"') { reader->state = GRPC_JSON_STATE_OBJECT_KEY_END; json_reader_set_key(reader); json_reader_string_clear(reader); } else { - if (c <= 0x001f) return GRPC_JSON_PARSE_ERROR; + if (c < 32) return GRPC_JSON_PARSE_ERROR; json_reader_string_add_char(reader, c); } break; @@ -362,6 +363,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) { reader->in_object = 0; reader->in_array = 1; break; + default: + return GRPC_JSON_PARSE_ERROR; } break; diff --git a/src/core/lib/security/client_auth_filter.c b/src/core/lib/security/client_auth_filter.c index b9e5bf0339..af6073e560 100644 --- a/src/core/lib/security/client_auth_filter.c +++ b/src/core/lib/security/client_auth_filter.c @@ -172,7 +172,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, calld->creds = grpc_composite_call_credentials_create(channel_call_creds, ctx->creds, NULL); if (calld->creds == NULL) { - bubble_up_error(exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, + bubble_up_error(exec_ctx, elem, GRPC_STATUS_INTERNAL, "Incompatible credentials set on channel and call."); return; } @@ -201,7 +201,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); - bubble_up_error(exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); + bubble_up_error(exec_ctx, elem, GRPC_STATUS_INTERNAL, error_msg); gpr_free(error_msg); } } diff --git a/src/core/lib/security/security_connector.c b/src/core/lib/security/security_connector.c index 5474bc3a9e..48b23a9dcf 100644 --- a/src/core/lib/security/security_connector.c +++ b/src/core/lib/security/security_connector.c @@ -42,6 +42,7 @@ #include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> +#include "src/core/ext/transport/chttp2/transport/alpn.h" #include "src/core/lib/security/credentials.h" #include "src/core/lib/security/handshake.h" #include "src/core/lib/security/secure_endpoint.h" @@ -49,7 +50,6 @@ #include "src/core/lib/support/env.h" #include "src/core/lib/support/load_file.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/transport/chttp2/alpn.h" #include "src/core/lib/tsi/fake_transport_security.h" #include "src/core/lib/tsi/ssl_transport_security.h" diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index d63a4a7401..77ad410b50 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -174,6 +174,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; + /* Call stats: only valid after trailing metadata received */ + grpc_transport_stream_stats stats; + /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ @@ -909,7 +912,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(int *)dest = (status != GRPC_STATUS_OK); } -static int are_write_flags_valid(uint32_t flags) { +static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); @@ -917,6 +920,15 @@ static int are_write_flags_valid(uint32_t flags) { return !(flags & invalid_positions); } +static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; + if (!is_client) { + invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + } + return !(flags & invalid_positions); +} + static batch_control *allocate_batch_control(grpc_call *call) { size_t i; for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { @@ -1196,7 +1208,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ - if (op->flags != 0) { + if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } @@ -1220,6 +1232,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->metadata_batch[0][0].deadline = call->send_deadline; stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; + stream_op.send_idempotent_request = + (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1371,6 +1385,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1392,6 +1407,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; } } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index b5546fb44a..3ccc21fb31 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -40,6 +40,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/time.h> /* TODO(ctiller): find another way? - better not to include census here */ +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/census/grpc_plugin.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/client_channel.h" @@ -47,8 +48,6 @@ #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/client_config/lb_policies/pick_first.h" -#include "src/core/lib/client_config/lb_policies/round_robin.h" #include "src/core/lib/client_config/lb_policy_registry.h" #include "src/core/lib/client_config/resolver_registry.h" #include "src/core/lib/client_config/resolvers/dns_resolver.h" @@ -67,7 +66,6 @@ #include "src/core/lib/surface/lame_client.h" #include "src/core/lib/surface/server.h" #include "src/core/lib/surface/surface_trace.h" -#include "src/core/lib/transport/chttp2_transport.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" @@ -75,6 +73,9 @@ #define GRPC_DEFAULT_NAME_PREFIX "dns:///" #endif +/* (generated) built in registry of plugins */ +extern void grpc_register_built_in_plugins(void); + #define MAX_PLUGINS 128 static gpr_once g_basic_init = GPR_ONCE_INIT; @@ -83,6 +84,7 @@ static int g_initializations; static void do_basic_init(void) { gpr_mu_init(&g_init_mu); + grpc_register_built_in_plugins(); /* TODO(ctiller): ideally remove this strict linkage */ grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_destroy); g_initializations = 0; @@ -165,9 +167,7 @@ void grpc_init(void) { gpr_time_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); - grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); - grpc_register_lb_policy(grpc_round_robin_lb_factory_create()); + grpc_lb_policy_registry_init(); grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 080734e9d5..55e5e49e11 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -100,6 +100,7 @@ typedef struct requested_call { typedef struct channel_registered_method { registered_method *server_registered_method; + uint32_t flags; grpc_mdstr *method; grpc_mdstr *host; } channel_registered_method; @@ -152,6 +153,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; + bool recv_idempotent_request; grpc_metadata_array initial_metadata; grpc_closure got_initial_metadata; @@ -171,6 +173,7 @@ struct request_matcher { struct registered_method { char *method; char *host; + uint32_t flags; request_matcher request_matcher; registered_method *next; }; @@ -468,6 +471,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -480,6 +486,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -598,9 +607,11 @@ static void server_mutate_op(grpc_call_element *elem, call_data *calld = elem->call_data; if (op->recv_initial_metadata != NULL) { + GPR_ASSERT(op->recv_idempotent_request == NULL); calld->recv_initial_metadata = op->recv_initial_metadata; calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; + op->recv_idempotent_request = &calld->recv_idempotent_request; } } @@ -830,10 +841,12 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host) { + const char *host, uint32_t flags) { registered_method *m; - GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)", - 3, (server, method, host)); + GRPC_API_TRACE( + "grpc_server_register_method(server=%p, method=%s, host=%s, " + "flags=0x%08x)", + 4, (server, method, host, flags)); if (!method) { gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); @@ -846,12 +859,18 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } + if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) { + gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", + flags); + return NULL; + } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; + m->flags = flags; server->registered_methods = m; return m; } @@ -930,6 +949,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (probes > max_probes) max_probes = probes; crm = &chand->registered_methods[(hash + probes) % slots]; crm->server_registered_method = rm; + crm->flags = rm->flags; crm->host = host; crm->method = method; } @@ -1247,6 +1267,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->flags = + 0 | (calld->recv_idempotent_request + ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST + : 0); break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 7605f09991..451c8d1cd3 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -44,11 +44,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/murmur_hash.h" #include "src/core/lib/support/string.h" -#include "src/core/lib/transport/chttp2/bin_encoder.h" #include "src/core/lib/transport/static_metadata.h" /* There are two kinds of mdelem and mdstr instances. diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index eda277b3dc..bde1fafc88 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -1,5 +1,4 @@ /* - * * Copyright 2015-2016, Google Inc. * All rights reserved. * @@ -28,19 +27,16 @@ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * */ /* * WARNING: Auto-generated code. * * To make changes to this file, change - * tools/codegen/core/gen_static_metadata.py, - * and then re-run it. + * tools/codegen/core/gen_static_metadata.py, and then re-run it. * * See metadata.h for an explanation of the interface here, and metadata.c for - * an - * explanation of what's going on. + * an explanation of what's going on. */ #include "src/core/lib/transport/static_metadata.h" @@ -52,7 +48,7 @@ uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 8, 6, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] = {11, 35, 10, 35, 12, 35, 12, 49, 13, 35, 14, 35, 15, 35, 16, 35, 17, 35, @@ -60,10 +56,10 @@ const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] = 30, 18, 30, 35, 31, 35, 32, 35, 36, 35, 37, 35, 38, 35, 39, 35, 42, 33, 42, 34, 42, 48, 42, 53, 42, 54, 42, 55, 42, 56, 43, 33, 43, 48, 43, 53, 46, 0, 46, 1, 46, 2, 50, 35, 57, 35, 58, 35, 59, 35, 60, 35, 61, 35, - 62, 35, 63, 35, 64, 35, 65, 35, 66, 40, 66, 68, 67, 78, 67, 79, 69, 35, - 70, 35, 71, 35, 72, 35, 73, 35, 74, 35, 75, 41, 75, 51, 75, 52, 76, 35, - 77, 35, 80, 3, 80, 4, 80, 5, 80, 6, 80, 7, 80, 8, 80, 9, 81, 35, - 82, 83, 84, 35, 85, 35, 86, 35, 87, 35, 88, 35}; + 62, 35, 63, 35, 64, 35, 65, 35, 66, 40, 66, 68, 66, 71, 67, 79, 67, 80, + 69, 35, 70, 35, 72, 35, 73, 35, 74, 35, 75, 35, 76, 41, 76, 51, 76, 52, + 77, 35, 78, 35, 81, 3, 81, 4, 81, 5, 81, 6, 81, 7, 81, 8, 81, 9, + 82, 35, 83, 84, 85, 35, 86, 35, 87, 35, 88, 35, 89, 35}; const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { "0", @@ -137,6 +133,7 @@ const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { "POST", "proxy-authenticate", "proxy-authorization", + "PUT", "range", "referer", "refresh", diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h index aff136a6d2..a05553a870 100644 --- a/src/core/lib/transport/static_metadata.h +++ b/src/core/lib/transport/static_metadata.h @@ -1,5 +1,4 @@ /* - * * Copyright 2015-2016, Google Inc. * All rights reserved. * @@ -28,19 +27,16 @@ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * */ /* * WARNING: Auto-generated code. * * To make changes to this file, change - * tools/codegen/core/gen_static_metadata.py, - * and then re-run it. + * tools/codegen/core/gen_static_metadata.py, and then re-run it. * * See metadata.h for an explanation of the interface here, and metadata.c for - * an - * explanation of what's going on. + * an explanation of what's going on. */ #ifndef GRPC_CORE_LIB_TRANSPORT_STATIC_METADATA_H @@ -48,7 +44,7 @@ #include "src/core/lib/transport/metadata.h" -#define GRPC_STATIC_MDSTR_COUNT 89 +#define GRPC_STATIC_MDSTR_COUNT 90 extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; /* "0" */ #define GRPC_MDSTR_0 (&grpc_static_mdstr_table[0]) @@ -193,44 +189,46 @@ extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; #define GRPC_MDSTR_PROXY_AUTHENTICATE (&grpc_static_mdstr_table[69]) /* "proxy-authorization" */ #define GRPC_MDSTR_PROXY_AUTHORIZATION (&grpc_static_mdstr_table[70]) +/* "PUT" */ +#define GRPC_MDSTR_PUT (&grpc_static_mdstr_table[71]) /* "range" */ -#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[71]) +#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[72]) /* "referer" */ -#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[72]) +#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[73]) /* "refresh" */ -#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[73]) +#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[74]) /* "retry-after" */ -#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[74]) +#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[75]) /* ":scheme" */ -#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[75]) +#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[76]) /* "server" */ -#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[76]) +#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[77]) /* "set-cookie" */ -#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[77]) +#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[78]) /* "/" */ -#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[78]) +#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[79]) /* "/index.html" */ -#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[79]) +#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[80]) /* ":status" */ -#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[80]) +#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[81]) /* "strict-transport-security" */ -#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[81]) +#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[82]) /* "te" */ -#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[82]) +#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[83]) /* "trailers" */ -#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[83]) +#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[84]) /* "transfer-encoding" */ -#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[84]) +#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[85]) /* "user-agent" */ -#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[85]) +#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[86]) /* "vary" */ -#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[86]) +#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[87]) /* "via" */ -#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[87]) +#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[88]) /* "www-authenticate" */ -#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[88]) +#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[89]) -#define GRPC_STATIC_MDELEM_COUNT 78 +#define GRPC_STATIC_MDELEM_COUNT 79 extern grpc_mdelem grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT]; extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT]; /* "accept-charset": "" */ @@ -343,61 +341,63 @@ extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT]; #define GRPC_MDELEM_METHOD_GET (&grpc_static_mdelem_table[49]) /* ":method": "POST" */ #define GRPC_MDELEM_METHOD_POST (&grpc_static_mdelem_table[50]) +/* ":method": "PUT" */ +#define GRPC_MDELEM_METHOD_PUT (&grpc_static_mdelem_table[51]) /* ":path": "/" */ -#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[51]) +#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[52]) /* ":path": "/index.html" */ -#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[52]) +#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[53]) /* "proxy-authenticate": "" */ -#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[53]) +#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[54]) /* "proxy-authorization": "" */ -#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[54]) +#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[55]) /* "range": "" */ -#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[55]) +#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[56]) /* "referer": "" */ -#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[56]) +#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[57]) /* "refresh": "" */ -#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[57]) +#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[58]) /* "retry-after": "" */ -#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[58]) +#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[59]) /* ":scheme": "grpc" */ -#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[59]) +#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[60]) /* ":scheme": "http" */ -#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[60]) +#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[61]) /* ":scheme": "https" */ -#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[61]) +#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[62]) /* "server": "" */ -#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[62]) +#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[63]) /* "set-cookie": "" */ -#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[63]) +#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[64]) /* ":status": "200" */ -#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[64]) +#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[65]) /* ":status": "204" */ -#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[65]) +#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[66]) /* ":status": "206" */ -#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[66]) +#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[67]) /* ":status": "304" */ -#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[67]) +#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[68]) /* ":status": "400" */ -#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[68]) +#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[69]) /* ":status": "404" */ -#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[69]) +#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[70]) /* ":status": "500" */ -#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[70]) +#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[71]) /* "strict-transport-security": "" */ #define GRPC_MDELEM_STRICT_TRANSPORT_SECURITY_EMPTY \ - (&grpc_static_mdelem_table[71]) + (&grpc_static_mdelem_table[72]) /* "te": "trailers" */ -#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[72]) +#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[73]) /* "transfer-encoding": "" */ -#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[73]) +#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[74]) /* "user-agent": "" */ -#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[74]) +#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[75]) /* "vary": "" */ -#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[75]) +#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[76]) /* "via": "" */ -#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[76]) +#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[77]) /* "www-authenticate": "" */ -#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[77]) +#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[78]) extern const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2]; diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 18256aae5e..411e4f140a 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -35,6 +35,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "src/core/lib/transport/transport_impl.h" #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -76,6 +77,24 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_closure_init(&refcount->destroy, cb, cb_arg); } +static void move64(uint64_t *from, uint64_t *to) { + *to += *from; + *from = 0; +} + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to) { + move64(&from->framing_bytes, &to->framing_bytes); + move64(&from->data_bytes, &to->data_bytes); + move64(&from->header_bytes, &to->header_bytes); +} + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to) { + grpc_transport_move_one_way_stats(&from->incoming, &to->incoming); + grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index e98cfe9515..6dd782c19e 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -78,11 +78,32 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif +typedef struct { + uint64_t framing_bytes; + uint64_t data_bytes; + uint64_t header_bytes; +} grpc_transport_one_way_stats; + +typedef struct grpc_transport_stream_stats { + grpc_transport_one_way_stats incoming; + grpc_transport_one_way_stats outgoing; +} grpc_transport_stream_stats; + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to); + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to); + /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { - /** Send initial metadata to the peer, from the provided metadata batch. */ + /** Send initial metadata to the peer, from the provided metadata batch. + idempotent_request MUST be set if this is non-null */ grpc_metadata_batch *send_initial_metadata; + /** Iff send_initial_metadata != NULL, flags if this is an idempotent request + or not */ + bool send_idempotent_request; /** Send trailing metadata to the peer, from the provided metadata batch. */ grpc_metadata_batch *send_trailing_metadata; @@ -92,6 +113,7 @@ typedef struct grpc_transport_stream_op { /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + bool *recv_idempotent_request; /** Should be enqueued when initial metadata is ready to be processed. */ grpc_closure *recv_initial_metadata_ready; @@ -104,6 +126,9 @@ typedef struct grpc_transport_stream_op { */ grpc_metadata_batch *recv_trailing_metadata; + /** Collect any stats into provided buffer, zero internal stat counters */ + grpc_transport_stream_stats *collect_stats; + /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ diff --git a/src/core/lib/client_config/lb_policies/pick_first.h b/src/core/plugin_registry/grpc_plugin_registry.c index dba86ea7ad..3e3c214c22 100644 --- a/src/core/lib/client_config/lb_policies/pick_first.h +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,13 +31,16 @@ * */ -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H +#include <grpc/grpc.h> -#include "src/core/lib/client_config/lb_policy_factory.h" +extern void grpc_lb_policy_pick_first_init(void); +extern void grpc_lb_policy_pick_first_shutdown(void); +extern void grpc_lb_policy_round_robin_init(void); +extern void grpc_lb_policy_round_robin_shutdown(void); -/** Returns a load balancing factory for the pick first policy, which picks up - * the first subchannel from \a subchannels to succesfully connect */ -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); - -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H */ +void grpc_register_built_in_plugins(void) { + grpc_register_plugin(grpc_lb_policy_pick_first_init, + grpc_lb_policy_pick_first_shutdown); + grpc_register_plugin(grpc_lb_policy_round_robin_init, + grpc_lb_policy_round_robin_shutdown); +} diff --git a/src/core/lib/client_config/lb_policies/round_robin.h b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index 52db1caa0c..3e3c214c22 100644 --- a/src/core/lib/client_config/lb_policies/round_robin.h +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,16 +31,16 @@ * */ -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H +#include <grpc/grpc.h> -#include "src/core/lib/client_config/lb_policy.h" +extern void grpc_lb_policy_pick_first_init(void); +extern void grpc_lb_policy_pick_first_shutdown(void); +extern void grpc_lb_policy_round_robin_init(void); +extern void grpc_lb_policy_round_robin_shutdown(void); -extern int grpc_lb_round_robin_trace; - -#include "src/core/lib/client_config/lb_policy_factory.h" - -/** Returns a load balancing factory for the round robin policy */ -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); - -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H */ +void grpc_register_built_in_plugins(void) { + grpc_register_plugin(grpc_lb_policy_pick_first_init, + grpc_lb_policy_pick_first_shutdown); + grpc_register_plugin(grpc_lb_policy_round_robin_init, + grpc_lb_policy_round_robin_shutdown); +} diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 7e5f557ffa..5ec05e4dcc 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -264,6 +264,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; bool in_flight_; const bool has_request_payload_; + uint32_t incoming_flags_; grpc_call* call_; grpc_call_details* call_details_; gpr_timespec deadline_; @@ -334,7 +335,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method(server_, method->name(), - host ? host->c_str() : nullptr); + host ? host->c_str() : nullptr, 0); if (tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs index 47a15224f1..1edeedae2f 100644 --- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs @@ -1,6 +1,6 @@ #region Copyright notice and license -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -46,16 +46,13 @@ namespace Grpc.Testing /// </summary> public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService { - private readonly int responseSize; - - public BenchmarkServiceImpl(int responseSize) + public BenchmarkServiceImpl() { - this.responseSize = responseSize; } public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context) { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; return Task.FromResult(response); } @@ -63,7 +60,7 @@ namespace Grpc.Testing { await requestStream.ForEachAsync(async request => { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; await responseStream.WriteAsync(response); }); } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index c4016012cb..e6dc2321c4 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -41,6 +41,7 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting /// <summary> /// Helper methods to start client runners for performance testing. /// </summary> - public static class ClientRunners + public class ClientRunners { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>(); + /// <summary> /// Creates a started client runner. /// </summary> public static IClientRunner CreateStarted(ClientConfig config) { + Logger.Debug("ClientConfig: {0}", config); string target = config.ServerTargets.Single(); - GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop); + GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, + "Only closed loop scenario supported for C#"); + GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); - var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; - var channel = new Channel(target, credentials); + if (config.OutstandingRpcsPerChannel != 0) + { + Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value"); + } + if (config.AsyncClientThreads != 0) + { + Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); + } + if (config.CoreLimit != 0) + { + Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value"); + } + if (config.CoreList.Count > 0) + { + Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); + } - switch (config.RpcType) + var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; + List<ChannelOption> channelOptions = null; + if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") { - case RpcType.UNARY: - return new SyncUnaryClientRunner(channel, - config.PayloadConfig.SimpleParams.ReqSize, - config.HistogramParams); - - case RpcType.STREAMING: - default: - throw new ArgumentException("Unsupported RpcType."); + channelOptions = new List<ChannelOption> + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) + }; } + var channel = new Channel(target, credentials, channelOptions); + + return new ClientRunnerImpl(channel, + config.ClientType, + config.RpcType, + config.PayloadConfig, + config.HistogramParams); } } - /// <summary> - /// Client that starts synchronous unary calls in a closed loop. - /// </summary> - public class SyncUnaryClientRunner : IClientRunner + public class ClientRunnerImpl : IClientRunner { const double SecondsToNanos = 1e9; readonly Channel channel; - readonly int payloadSize; + readonly ClientType clientType; + readonly RpcType rpcType; + readonly PayloadConfig payloadConfig; readonly Histogram histogram; readonly BenchmarkService.IBenchmarkServiceClient client; @@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams) + public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); - this.payloadSize = payloadSize; + this.clientType = clientType; + this.rpcType = rpcType; + this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); this.client = BenchmarkService.NewClient(channel); - this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning); + + var threadBody = GetThreadBody(); + this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); } public ClientStats GetStats(bool reset) @@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting await channel.ShutdownAsync(); } - private void Run() + private void RunClosedLoopUnary() { - var request = new SimpleRequest - { - Payload = CreateZerosPayload(payloadSize) - }; + var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); while (!stoppedCts.Token.IsCancellationRequested) @@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting } } + private async Task RunClosedLoopUnaryAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await client.UnaryCallAsync(request); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + } + + private async Task RunClosedLoopStreamingAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + using (var call = client.StreamingCall()) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + + private async Task RunGenericClosedLoopStreamingAsync() + { + var request = CreateByteBufferRequest(); + var stopwatch = new Stopwatch(); + + var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions()); + + using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + + private Action GetThreadBody() + { + if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) + { + GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); + return () => + { + RunGenericClosedLoopStreamingAsync().Wait(); + }; + } + + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); + if (clientType == ClientType.SYNC_CLIENT) + { + GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); + return RunClosedLoopUnary; + } + else if (clientType == ClientType.ASYNC_CLIENT) + { + switch (rpcType) + { + case RpcType.UNARY: + return () => + { + RunClosedLoopUnaryAsync().Wait(); + }; + case RpcType.STREAMING: + return () => + { + RunClosedLoopStreamingAsync().Wait(); + }; + } + } + throw new ArgumentException("Unsupported configuration."); + } + + private SimpleRequest CreateSimpleRequest() + { + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); + return new SimpleRequest + { + Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize), + ResponseSize = payloadConfig.SimpleParams.RespSize + }; + } + + private byte[] CreateByteBufferRequest() + { + return new byte[payloadConfig.BytebufParams.ReqSize]; + } + private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs new file mode 100644 index 0000000000..c6128264ac --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + /// <summary> + /// Utility methods for defining and calling a service that doesn't use protobufs + /// for serialization/deserialization. + /// </summary> + public static class GenericService + { + readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b); + + public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + + public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler) + { + return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName) + .AddMethod(StreamingCallMethod, handler).Build(); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 372991374e..4c049944ea 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -120,6 +120,7 @@ <Compile Include="WorkerServiceImpl.cs" /> <Compile Include="QpsWorker.cs" /> <Compile Include="WallClockStopwatch.cs" /> + <Compile Include="GenericService.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs index 06d5ee93d8..a8cf75bd81 100644 --- a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs @@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting { var serverConfig = new ServerConfig { - ServerType = ServerType.ASYNC_SERVER, - PayloadConfig = new PayloadConfig - { - SimpleParams = new SimpleProtoParams - { - RespSize = 100 - } - } + ServerType = ServerType.ASYNC_SERVER }; serverRunner = ServerRunners.CreateStarted(serverConfig); } @@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting { SimpleParams = new SimpleProtoParams { - ReqSize = 100 + ReqSize = 100, + RespSize = 100 } }, HistogramParams = new HistogramParams diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 4a73645e6c..c326378cfa 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -41,6 +41,7 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting /// <summary> /// Helper methods to start server runners for performance testing. /// </summary> - public static class ServerRunners + public class ServerRunners { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerRunners>(); + /// <summary> /// Creates a started server runner. /// </summary> public static IServerRunner CreateStarted(ServerConfig config) { - GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER); + Logger.Debug("ServerConfig: {0}", config); var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure; - // TODO: qps_driver needs to setup payload properly... - int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0; + if (config.AsyncServerThreads != 0) + { + Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value"); + } + if (config.CoreLimit != 0) + { + Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value"); + } + if (config.CoreList.Count > 0) + { + Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); + } + + ServerServiceDefinition service = null; + if (config.ServerType == ServerType.ASYNC_SERVER) + { + GrpcPreconditions.CheckArgument(config.PayloadConfig == null, + "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + service = BenchmarkService.BindService(new BenchmarkServiceImpl()); + } + else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER) + { + var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize); + service = GenericService.BindHandler(genericService.StreamingCall); + } + else + { + throw new ArgumentException("Unsupported ServerType"); + } + var server = new Server { - Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) }, + Services = { service }, Ports = { new ServerPort("[::]", config.Port, credentials) } }; server.Start(); return new ServerRunnerImpl(server); } + + private class GenericServiceImpl + { + readonly byte[] response; + + public GenericServiceImpl(int responseSize) + { + this.response = new byte[responseSize]; + } + + /// <summary> + /// Generic streaming call handler. + /// </summary> + public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context) + { + await requestStream.ForEachAsync(async request => + { + await responseStream.WriteAsync(response); + }); + } + } } /// <summary> @@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting { return server.ShutdownAsync(); } - } - + } } diff --git a/src/node/index.js b/src/node/index.js index 1c197729d7..6567d56260 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -87,6 +87,10 @@ var loadObject = exports.loadObject; * Buffers. Defaults to false * - longsAsStrings: deserialize long values as strings instead of objects. * Defaults to true + * - deprecatedArgumentOrder: Use the beta method argument order for client + * methods, with optional arguments after the callback. Defaults to false. + * This option is only a temporary stopgap measure to smooth an API breakage. + * It is deprecated, and new code should not use it. * @param {string|{root: string, file: string}} filename The file to load * @param {string=} format The file format to expect. Must be either 'proto' or * 'json'. Defaults to 'proto' diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 5602011a8e..ac0eddcf45 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -286,7 +286,7 @@ function cancelAfterFirstResponse(client, done) { function timeoutOnSleepingServer(client, done) { var deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 1); - var call = client.fullDuplexCall(null, {deadline: deadline}); + var call = client.fullDuplexCall({deadline: deadline}); call.write({ payload: {body: zeroBuffer(27182)} }); @@ -316,10 +316,10 @@ function customMetadata(client, done) { body: zeroBuffer(271828) } }; - var unary = client.unaryCall(arg, function(err, resp) { + var unary = client.unaryCall(arg, metadata, function(err, resp) { assert.ifError(err); done(); - }, metadata); + }); unary.on('metadata', function(metadata) { assert.deepEqual(metadata.get(ECHO_INITIAL_KEY), ['test_initial_metadata_value']); @@ -455,14 +455,14 @@ function perRpcAuthTest(client, done, extra) { credential = credential.createScoped(scope); } var creds = grpc.credentials.createFromGoogleCredential(credential); - client.unaryCall(arg, function(err, resp) { + client.unaryCall(arg, {credentials: creds}, function(err, resp) { assert.ifError(err); assert.strictEqual(resp.username, SERVICE_ACCOUNT_EMAIL); assert(extra.oauth_scope.indexOf(resp.oauth_scope) > -1); if (done) { done(); } - }, null, {credentials: creds}); + }); }); } diff --git a/src/node/src/client.js b/src/node/src/client.js index 2459e28321..82142379da 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -50,6 +50,7 @@ 'use strict'; var _ = require('lodash'); +var arguejs = require('arguejs'); var grpc = require('./grpc_extension'); @@ -353,21 +354,23 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * @this {Client} Client object. Must have a channel member. * @param {*} argument The argument to the call. Should be serializable with * serialize - * @param {function(?Error, value=)} callback The callback to for when the - * response is received * @param {Metadata=} metadata Metadata to add to the call * @param {Object=} options Options map + * @param {function(?Error, value=)} callback The callback to for when the + * response is received * @return {EventEmitter} An event emitter for stream related events */ - function makeUnaryRequest(argument, callback, metadata, options) { + function makeUnaryRequest(argument, metadata, options, callback) { /* jshint validthis: true */ + /* While the arguments are listed in the function signature, those variables + * are not used directly. Instead, ArgueJS processes the arguments + * object. This allows for simple handling of optional arguments in the + * middle of the argument list, and also provides type checking. */ + var args = arguejs({argument: null, metadata: [Metadata, new Metadata()], + options: [Object], callback: Function}, arguments); var emitter = new EventEmitter(); - var call = getCall(this.$channel, method, options); - if (metadata === null || metadata === undefined) { - metadata = new Metadata(); - } else { - metadata = metadata.clone(); - } + var call = getCall(this.$channel, method, args.options); + metadata = args.metadata.clone(); emitter.cancel = function cancel() { call.cancel(); }; @@ -375,9 +378,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return call.getPeer(); }; var client_batch = {}; - var message = serialize(argument); - if (options) { - message.grpcWriteFlags = options.flags; + var message = serialize(args.argument); + if (args.options) { + message.grpcWriteFlags = args.options.flags; } client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata._getCoreRepresentation(); @@ -395,7 +398,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { if (status.code === grpc.status.OK) { if (err) { // Got a batch error, but OK status. Something went wrong - callback(err); + args.callback(err); return; } else { try { @@ -414,9 +417,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { error = new Error(status.details); error.code = status.code; error.metadata = status.metadata; - callback(error); + args.callback(error); } else { - callback(null, deserialized); + args.callback(null, deserialized); } emitter.emit('status', status); emitter.emit('metadata', Metadata._fromCoreRepresentation( @@ -440,21 +443,23 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * Make a client stream request with this method on the given channel with the * given callback, etc. * @this {Client} Client object. Must have a channel member. - * @param {function(?Error, value=)} callback The callback to for when the - * response is received * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Object=} options Options map + * @param {function(?Error, value=)} callback The callback to for when the + * response is received * @return {EventEmitter} An event emitter for stream related events */ - function makeClientStreamRequest(callback, metadata, options) { + function makeClientStreamRequest(metadata, options, callback) { /* jshint validthis: true */ - var call = getCall(this.$channel, method, options); - if (metadata === null || metadata === undefined) { - metadata = new Metadata(); - } else { - metadata = metadata.clone(); - } + /* While the arguments are listed in the function signature, those variables + * are not used directly. Instead, ArgueJS processes the arguments + * object. This allows for simple handling of optional arguments in the + * middle of the argument list, and also provides type checking. */ + var args = arguejs({metadata: [Metadata, new Metadata()], + options: [Object], callback: Function}, arguments); + var call = getCall(this.$channel, method, args.options); + metadata = args.metadata.clone(); var stream = new ClientWritableStream(call, serialize); var metadata_batch = {}; metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = @@ -481,7 +486,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { if (status.code === grpc.status.OK) { if (err) { // Got a batch error, but OK status. Something went wrong - callback(err); + args.callback(err); return; } else { try { @@ -500,9 +505,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { error = new Error(response.status.details); error.code = status.code; error.metadata = status.metadata; - callback(error); + args.callback(error); } else { - callback(null, deserialized); + args.callback(null, deserialized); } stream.emit('status', status); }); @@ -533,17 +538,18 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { */ function makeServerStreamRequest(argument, metadata, options) { /* jshint validthis: true */ - var call = getCall(this.$channel, method, options); - if (metadata === null || metadata === undefined) { - metadata = new Metadata(); - } else { - metadata = metadata.clone(); - } + /* While the arguments are listed in the function signature, those variables + * are not used directly. Instead, ArgueJS processes the arguments + * object. */ + var args = arguejs({argument: null, metadata: [Metadata, new Metadata()], + options: [Object]}, arguments); + var call = getCall(this.$channel, method, args.options); + metadata = args.metadata.clone(); var stream = new ClientReadableStream(call, deserialize); var start_batch = {}; - var message = serialize(argument); - if (options) { - message.grpcWriteFlags = options.flags; + var message = serialize(args.argument); + if (args.options) { + message.grpcWriteFlags = args.options.flags; } start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata._getCoreRepresentation(); @@ -595,12 +601,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { */ function makeBidiStreamRequest(metadata, options) { /* jshint validthis: true */ - var call = getCall(this.$channel, method, options); - if (metadata === null || metadata === undefined) { - metadata = new Metadata(); - } else { - metadata = metadata.clone(); - } + /* While the arguments are listed in the function signature, those variables + * are not used directly. Instead, ArgueJS processes the arguments + * object. */ + var args = arguejs({metadata: [Metadata, new Metadata()], + options: [Object]}, arguments); + var call = getCall(this.$channel, method, args.options); + metadata = args.metadata.clone(); var stream = new ClientDuplexStream(call, serialize, deserialize); var start_batch = {}; start_batch[grpc.opType.SEND_INITIAL_METADATA] = @@ -643,6 +650,40 @@ var requester_makers = { bidi: makeBidiStreamRequestFunction }; +function getDefaultValues(metadata, options) { + var res = {}; + res.metadata = metadata || new Metadata(); + res.options = options || {}; + return res; +} + +/** + * Map with wrappers for each type of requester function to make it use the old + * argument order with optional arguments after the callback. + */ +var deprecated_request_wrap = { + unary: function(makeUnaryRequest) { + return function makeWrappedUnaryRequest(argument, callback, + metadata, options) { + /* jshint validthis: true */ + var opt_args = getDefaultValues(metadata, metadata); + return makeUnaryRequest.call(this, argument, opt_args.metadata, + opt_args.options, callback); + }; + }, + client_stream: function(makeServerStreamRequest) { + return function makeWrappedClientStreamRequest(callback, metadata, + options) { + /* jshint validthis: true */ + var opt_args = getDefaultValues(metadata, options); + return makeServerStreamRequest.call(this, opt_args.metadata, + opt_args.options, callback); + }; + }, + server_stream: _.identity, + bidi: _.identity +}; + /** * Creates a constructor for a client with the given methods. The methods object * maps method name to an object with the following keys: @@ -654,9 +695,19 @@ var requester_makers = { * responseDeserialize: function to deserialize response objects * @param {Object} methods An object mapping method names to method attributes * @param {string} serviceName The fully qualified name of the service + * @param {Object} class_options An options object. Currently only uses the key + * deprecatedArgumentOrder, a boolean that Indicates that the old argument + * order should be used for methods, with optional arguments at the end + * instead of the callback at the end. Defaults to false. This option is + * only a temporary stopgap measure to smooth an API breakage. + * It is deprecated, and new code should not use it. * @return {function(string, Object)} New client constructor */ -exports.makeClientConstructor = function(methods, serviceName) { +exports.makeClientConstructor = function(methods, serviceName, + class_options) { + if (!class_options) { + class_options = {}; + } /** * Create a client with the given methods * @constructor @@ -703,8 +754,13 @@ exports.makeClientConstructor = function(methods, serviceName) { } var serialize = attrs.requestSerialize; var deserialize = attrs.responseDeserialize; - Client.prototype[name] = requester_makers[method_type]( + var method_func = requester_makers[method_type]( attrs.path, serialize, deserialize); + if (class_options.deprecatedArgumentOrder) { + Client.prototype[name] = deprecated_request_wrap(method_func); + } else { + Client.prototype[name] = method_func; + } // Associate all provided attributes with the method _.assign(Client.prototype[name], attrs); }); @@ -761,8 +817,13 @@ exports.waitForClientReady = function(client, deadline, callback) { exports.makeProtobufClientConstructor = function(service, options) { var method_attrs = common.getProtobufServiceAttrs(service, service.name, options); + var deprecatedArgumentOrder = false; + if (options) { + deprecatedArgumentOrder = options.deprecatedArgumentOrder; + } var Client = exports.makeClientConstructor( - method_attrs, common.fullyQualifiedName(service)); + method_attrs, common.fullyQualifiedName(service), + deprecatedArgumentOrder); Client.service = service; Client.service.grpc_options = options; return Client; diff --git a/src/node/test/credentials_test.js b/src/node/test/credentials_test.js index 294600c85a..73eadfab2c 100644 --- a/src/node/test/credentials_test.js +++ b/src/node/test/credentials_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -398,18 +398,20 @@ describe('client credentials', function() { metadataUpdater); }); it('Should update metadata on a unary call', function(done) { - var call = client.unary({}, function(err, data) { - assert.ifError(err); - }, null, {credentials: updater_creds}); + var call = client.unary({}, {credentials: updater_creds}, + function(err, data) { + assert.ifError(err); + }); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']); done(); }); }); it('should update metadata on a client streaming call', function(done) { - var call = client.clientStream(function(err, data) { - assert.ifError(err); - }, null, {credentials: updater_creds}); + var call = client.clientStream({credentials: updater_creds}, + function(err, data) { + assert.ifError(err); + }); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']); done(); @@ -417,7 +419,7 @@ describe('client credentials', function() { call.end(); }); it('should update metadata on a server streaming call', function(done) { - var call = client.serverStream({}, null, {credentials: updater_creds}); + var call = client.serverStream({}, {credentials: updater_creds}); call.on('data', function() {}); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']); @@ -425,7 +427,7 @@ describe('client credentials', function() { }); }); it('should update metadata on a bidi streaming call', function(done) { - var call = client.bidiStream(null, {credentials: updater_creds}); + var call = client.bidiStream({credentials: updater_creds}); call.on('data', function() {}); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']); @@ -443,9 +445,10 @@ describe('client credentials', function() { altMetadataUpdater); var combined_updater = grpc.credentials.combineCallCredentials( updater_creds, alt_updater_creds); - var call = client.unary({}, function(err, data) { - assert.ifError(err); - }, null, {credentials: combined_updater}); + var call = client.unary({}, {credentials: combined_updater}, + function(err, data) { + assert.ifError(err); + }); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('plugin_key'), ['plugin_value']); assert.deepEqual(metadata.get('other_plugin_key'), diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index edbfc0a288..5a704ee133 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -404,18 +404,18 @@ describe('Echo metadata', function() { server.forceShutdown(); }); it('with unary call', function(done) { - var call = client.unary({}, function(err, data) { + var call = client.unary({}, metadata, function(err, data) { assert.ifError(err); - }, metadata); + }); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('key'), ['value']); done(); }); }); it('with client stream call', function(done) { - var call = client.clientStream(function(err, data) { + var call = client.clientStream(metadata, function(err, data) { assert.ifError(err); - }, metadata); + }); call.on('metadata', function(metadata) { assert.deepEqual(metadata.get('key'), ['value']); done(); @@ -441,8 +441,8 @@ describe('Echo metadata', function() { }); it('shows the correct user-agent string', function(done) { var version = require('../../../package.json').version; - var call = client.unary({}, function(err, data) { assert.ifError(err); }, - metadata); + var call = client.unary({}, metadata, + function(err, data) { assert.ifError(err); }); call.on('metadata', function(metadata) { assert(_.startsWith(metadata.get('user-agent')[0], 'grpc-node/' + version)); @@ -452,8 +452,8 @@ describe('Echo metadata', function() { it('properly handles duplicate values', function(done) { var dup_metadata = metadata.clone(); dup_metadata.add('key', 'value2'); - var call = client.unary({}, function(err, data) {assert.ifError(err); }, - dup_metadata); + var call = client.unary({}, dup_metadata, + function(err, data) {assert.ifError(err); }); call.on('metadata', function(resp_metadata) { // Two arrays are equal iff their symmetric difference is empty assert.deepEqual(_.xor(dup_metadata.get('key'), resp_metadata.get('key')), @@ -954,7 +954,7 @@ describe('Call propagation', function() { done = multiDone(done, 2); var call; proxy_impl.unary = function(parent, callback) { - client.unary(parent.request, function(err, value) { + client.unary(parent.request, {parent: parent}, function(err, value) { try { assert(err); assert.strictEqual(err.code, grpc.status.CANCELLED); @@ -962,7 +962,7 @@ describe('Call propagation', function() { callback(err, value); done(); } - }, null, {parent: parent}); + }); call.cancel(); }; proxy.addProtoService(test_service, proxy_impl); @@ -976,7 +976,7 @@ describe('Call propagation', function() { done = multiDone(done, 2); var call; proxy_impl.clientStream = function(parent, callback) { - client.clientStream(function(err, value) { + client.clientStream({parent: parent}, function(err, value) { try { assert(err); assert.strictEqual(err.code, grpc.status.CANCELLED); @@ -984,7 +984,7 @@ describe('Call propagation', function() { callback(err, value); done(); } - }, null, {parent: parent}); + }); call.cancel(); }; proxy.addProtoService(test_service, proxy_impl); @@ -998,8 +998,7 @@ describe('Call propagation', function() { done = multiDone(done, 2); var call; proxy_impl.serverStream = function(parent) { - var child = client.serverStream(parent.request, null, - {parent: parent}); + var child = client.serverStream(parent.request, {parent: parent}); child.on('data', function() {}); child.on('error', function(err) { assert(err); @@ -1023,7 +1022,7 @@ describe('Call propagation', function() { done = multiDone(done, 2); var call; proxy_impl.bidiStream = function(parent) { - var child = client.bidiStream(null, {parent: parent}); + var child = client.bidiStream({parent: parent}); child.on('data', function() {}); child.on('error', function(err) { assert(err); @@ -1051,7 +1050,8 @@ describe('Call propagation', function() { it('With a client stream call', function(done) { done = multiDone(done, 2); proxy_impl.clientStream = function(parent, callback) { - client.clientStream(function(err, value) { + var options = {parent: parent, propagate_flags: deadline_flags}; + client.clientStream(options, function(err, value) { try { assert(err); assert(err.code === grpc.status.DEADLINE_EXCEEDED || @@ -1060,7 +1060,7 @@ describe('Call propagation', function() { callback(err, value); done(); } - }, null, {parent: parent, propagate_flags: deadline_flags}); + }); }; proxy.addProtoService(test_service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); @@ -1069,15 +1069,15 @@ describe('Call propagation', function() { grpc.credentials.createInsecure()); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 1); - proxy_client.clientStream(function(err, value) { + proxy_client.clientStream({deadline: deadline}, function(err, value) { done(); - }, null, {deadline: deadline}); + }); }); it('With a bidi stream call', function(done) { done = multiDone(done, 2); proxy_impl.bidiStream = function(parent) { var child = client.bidiStream( - null, {parent: parent, propagate_flags: deadline_flags}); + {parent: parent, propagate_flags: deadline_flags}); child.on('data', function() {}); child.on('error', function(err) { assert(err); @@ -1093,7 +1093,7 @@ describe('Call propagation', function() { grpc.credentials.createInsecure()); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 1); - var call = proxy_client.bidiStream(null, {deadline: deadline}); + var call = proxy_client.bidiStream({deadline: deadline}); call.on('data', function() {}); call.on('error', function(err) { done(); diff --git a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h index 02871d5d02..4b92504b55 100644 --- a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h +++ b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h @@ -54,7 +54,9 @@ GRPC_XMACRO_ITEM. #endif +#if TARGET_OS_IPHONE GRPC_XMACRO_ITEM(isCell, IsWWAN) +#endif GRPC_XMACRO_ITEM(reachable, Reachable) GRPC_XMACRO_ITEM(transientConnection, TransientConnection) GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired) diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 4d18369e1f..adcd8e954a 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -283,7 +283,7 @@ extern grpc_call_destroy_type grpc_call_destroy_import; typedef grpc_call_error(*grpc_server_request_call_type)(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); extern grpc_server_request_call_type grpc_server_request_call_import; #define grpc_server_request_call grpc_server_request_call_import -typedef void *(*grpc_server_register_method_type)(grpc_server *server, const char *method, const char *host); +typedef void *(*grpc_server_register_method_type)(grpc_server *server, const char *method, const char *host, uint32_t flags); extern grpc_server_register_method_type grpc_server_register_method_import; #define grpc_server_register_method grpc_server_register_method_import typedef grpc_call_error(*grpc_server_request_registered_call_type)(grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a5bc18af5e..3cd8f62221 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -74,9 +74,44 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/tmpfile_posix.c', 'src/core/lib/support/tmpfile_win32.c', 'src/core/lib/support/wrap_memcpy.c', + 'src/core/ext/lb_policy/grpclb/load_balancer_api.c', + 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c', + 'src/core/ext/lb_policy/pick_first/pick_first.c', + 'src/core/ext/lb_policy/round_robin/round_robin.c', + 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', + 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', + 'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c', + 'src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c', + 'src/core/ext/transport/chttp2/transport/alpn.c', + 'src/core/ext/transport/chttp2/transport/bin_encoder.c', + 'src/core/ext/transport/chttp2/transport/chttp2_transport.c', + 'src/core/ext/transport/chttp2/transport/frame_data.c', + 'src/core/ext/transport/chttp2/transport/frame_goaway.c', + 'src/core/ext/transport/chttp2/transport/frame_ping.c', + 'src/core/ext/transport/chttp2/transport/frame_rst_stream.c', + 'src/core/ext/transport/chttp2/transport/frame_settings.c', + 'src/core/ext/transport/chttp2/transport/frame_window_update.c', + 'src/core/ext/transport/chttp2/transport/hpack_encoder.c', + 'src/core/ext/transport/chttp2/transport/hpack_parser.c', + 'src/core/ext/transport/chttp2/transport/hpack_table.c', + 'src/core/ext/transport/chttp2/transport/huffsyms.c', + 'src/core/ext/transport/chttp2/transport/incoming_metadata.c', + 'src/core/ext/transport/chttp2/transport/parsing.c', + 'src/core/ext/transport/chttp2/transport/status_conversion.c', + 'src/core/ext/transport/chttp2/transport/stream_lists.c', + 'src/core/ext/transport/chttp2/transport/stream_map.c', + 'src/core/ext/transport/chttp2/transport/timeout_encoding.c', + 'src/core/ext/transport/chttp2/transport/varint.c', + 'src/core/ext/transport/chttp2/transport/writing.c', + 'src/core/lib/census/context.c', 'src/core/lib/census/grpc_context.c', 'src/core/lib/census/grpc_filter.c', 'src/core/lib/census/grpc_plugin.c', + 'src/core/lib/census/initialize.c', + 'src/core/lib/census/mlog.c', + 'src/core/lib/census/operation.c', + 'src/core/lib/census/placeholders.c', + 'src/core/lib/census/tracing.c', 'src/core/lib/channel/channel_args.c', 'src/core/lib/channel/channel_stack.c', 'src/core/lib/channel/channel_stack_builder.c', @@ -90,9 +125,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/client_config/connector.c', 'src/core/lib/client_config/default_initial_connect_string.c', 'src/core/lib/client_config/initial_connect_string.c', - 'src/core/lib/client_config/lb_policies/load_balancer_api.c', - 'src/core/lib/client_config/lb_policies/pick_first.c', - 'src/core/lib/client_config/lb_policies/round_robin.c', 'src/core/lib/client_config/lb_policy.c', 'src/core/lib/client_config/lb_policy_factory.c', 'src/core/lib/client_config/lb_policy_registry.c', @@ -110,6 +142,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', 'src/core/lib/http/httpcli.c', + 'src/core/lib/http/httpcli_security_connector.c', 'src/core/lib/http/parser.c', 'src/core/lib/iomgr/closure.c', 'src/core/lib/iomgr/endpoint.c', @@ -157,7 +190,20 @@ CORE_SOURCE_FILES = [ 'src/core/lib/json/json_reader.c', 'src/core/lib/json/json_string.c', 'src/core/lib/json/json_writer.c', - 'src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c', + 'src/core/lib/security/b64.c', + 'src/core/lib/security/client_auth_filter.c', + 'src/core/lib/security/credentials.c', + 'src/core/lib/security/credentials_metadata.c', + 'src/core/lib/security/credentials_posix.c', + 'src/core/lib/security/credentials_win32.c', + 'src/core/lib/security/google_default_credentials.c', + 'src/core/lib/security/handshake.c', + 'src/core/lib/security/json_token.c', + 'src/core/lib/security/jwt_verifier.c', + 'src/core/lib/security/secure_endpoint.c', + 'src/core/lib/security/security_connector.c', + 'src/core/lib/security/security_context.c', + 'src/core/lib/security/server_auth_filter.c', 'src/core/lib/surface/alarm.c', 'src/core/lib/surface/api_trace.c', 'src/core/lib/surface/byte_buffer.c', @@ -167,74 +213,29 @@ CORE_SOURCE_FILES = [ 'src/core/lib/surface/call_log_batch.c', 'src/core/lib/surface/channel.c', 'src/core/lib/surface/channel_connectivity.c', - 'src/core/lib/surface/channel_create.c', 'src/core/lib/surface/channel_init.c', 'src/core/lib/surface/channel_ping.c', 'src/core/lib/surface/channel_stack_type.c', 'src/core/lib/surface/completion_queue.c', 'src/core/lib/surface/event_string.c', 'src/core/lib/surface/init.c', + 'src/core/lib/surface/init_secure.c', 'src/core/lib/surface/lame_client.c', 'src/core/lib/surface/metadata_array.c', 'src/core/lib/surface/server.c', - 'src/core/lib/surface/server_chttp2.c', 'src/core/lib/surface/validate_metadata.c', 'src/core/lib/surface/version.c', 'src/core/lib/transport/byte_stream.c', - 'src/core/lib/transport/chttp2/alpn.c', - 'src/core/lib/transport/chttp2/bin_encoder.c', - 'src/core/lib/transport/chttp2/frame_data.c', - 'src/core/lib/transport/chttp2/frame_goaway.c', - 'src/core/lib/transport/chttp2/frame_ping.c', - 'src/core/lib/transport/chttp2/frame_rst_stream.c', - 'src/core/lib/transport/chttp2/frame_settings.c', - 'src/core/lib/transport/chttp2/frame_window_update.c', - 'src/core/lib/transport/chttp2/hpack_encoder.c', - 'src/core/lib/transport/chttp2/hpack_parser.c', - 'src/core/lib/transport/chttp2/hpack_table.c', - 'src/core/lib/transport/chttp2/huffsyms.c', - 'src/core/lib/transport/chttp2/incoming_metadata.c', - 'src/core/lib/transport/chttp2/parsing.c', - 'src/core/lib/transport/chttp2/status_conversion.c', - 'src/core/lib/transport/chttp2/stream_lists.c', - 'src/core/lib/transport/chttp2/stream_map.c', - 'src/core/lib/transport/chttp2/timeout_encoding.c', - 'src/core/lib/transport/chttp2/varint.c', - 'src/core/lib/transport/chttp2/writing.c', - 'src/core/lib/transport/chttp2_transport.c', 'src/core/lib/transport/connectivity_state.c', 'src/core/lib/transport/metadata.c', 'src/core/lib/transport/metadata_batch.c', 'src/core/lib/transport/static_metadata.c', 'src/core/lib/transport/transport.c', 'src/core/lib/transport/transport_op_string.c', - 'src/core/lib/http/httpcli_security_connector.c', - 'src/core/lib/security/b64.c', - 'src/core/lib/security/client_auth_filter.c', - 'src/core/lib/security/credentials.c', - 'src/core/lib/security/credentials_metadata.c', - 'src/core/lib/security/credentials_posix.c', - 'src/core/lib/security/credentials_win32.c', - 'src/core/lib/security/google_default_credentials.c', - 'src/core/lib/security/handshake.c', - 'src/core/lib/security/json_token.c', - 'src/core/lib/security/jwt_verifier.c', - 'src/core/lib/security/secure_endpoint.c', - 'src/core/lib/security/security_connector.c', - 'src/core/lib/security/security_context.c', - 'src/core/lib/security/server_auth_filter.c', - 'src/core/lib/security/server_secure_chttp2.c', - 'src/core/lib/surface/init_secure.c', - 'src/core/lib/surface/secure_channel_create.c', 'src/core/lib/tsi/fake_transport_security.c', 'src/core/lib/tsi/ssl_transport_security.c', 'src/core/lib/tsi/transport_security.c', - 'src/core/lib/census/context.c', - 'src/core/lib/census/initialize.c', - 'src/core/lib/census/mlog.c', - 'src/core/lib/census/operation.c', - 'src/core/lib/census/placeholders.c', - 'src/core/lib/census/tracing.c', + 'src/core/plugin_registry/grpc_plugin_registry.c', 'third_party/nanopb/pb_common.c', 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py index d34250b02c..aeeb2754ea 100644 --- a/src/python/grpcio/precompiled.py +++ b/src/python/grpcio/precompiled.py @@ -91,6 +91,9 @@ class BuildTaggedExt(setuptools.Command): def update_setup_arguments(setup_arguments): + if not USE_PRECOMPILED_BINARIES: + sys.stderr.write('not using precompiled extension') + return url = '{}/{}.so'.format(BINARIES_REPOSITORY, _tagged_ext_name('cygrpc')) target_path = os.path.join(PYTHON_STEM, 'grpc/_cython/cygrpc.so') try: diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index db7cac363a..9b617e13d3 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -50,21 +50,18 @@ grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) { } VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { - size_t length = 0; - char *string = NULL; - size_t offset = 0; + VALUE rb_string; grpc_byte_buffer_reader reader; gpr_slice next; if (buffer == NULL) { return Qnil; - } - length = grpc_byte_buffer_length(buffer); - string = xmalloc(length + 1); + rb_string = rb_str_buf_new(grpc_byte_buffer_length(buffer)); grpc_byte_buffer_reader_init(&reader, buffer); while (grpc_byte_buffer_reader_next(&reader, &next) != 0) { - memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); - offset += GPR_SLICE_LENGTH(next); + rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next), + GPR_SLICE_LENGTH(next)); + gpr_slice_unref(next); } - return rb_str_new(string, length); + return rb_string; } diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index cd0aa6aaf2..b0829efdc7 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -551,13 +551,26 @@ static void grpc_run_batch_stack_init(run_batch_stack *st, /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly * cleaned up */ static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { + size_t i = 0; + grpc_metadata_array_destroy(&st->send_metadata); grpc_metadata_array_destroy(&st->send_trailing_metadata); grpc_metadata_array_destroy(&st->recv_metadata); grpc_metadata_array_destroy(&st->recv_trailing_metadata); + if (st->recv_status_details != NULL) { gpr_free(st->recv_status_details); } + + if (st->recv_message != NULL) { + grpc_byte_buffer_destroy(st->recv_message); + } + + for (i = 0; i < st->op_num; i++) { + if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) { + grpc_byte_buffer_destroy(st->ops[i].data.send_message); + } + } } /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from @@ -643,7 +656,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { break; case GRPC_OP_SEND_MESSAGE: rb_struct_aset(result, sym_send_message, Qtrue); - grpc_byte_buffer_destroy(st->ops[i].data.send_message); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: rb_struct_aset(result, sym_send_close, Qtrue); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 3bf81af8fb..22ea84c750 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -283,7 +283,7 @@ extern grpc_call_destroy_type grpc_call_destroy_import; typedef grpc_call_error(*grpc_server_request_call_type)(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); extern grpc_server_request_call_type grpc_server_request_call_import; #define grpc_server_request_call grpc_server_request_call_import -typedef void *(*grpc_server_register_method_type)(grpc_server *server, const char *method, const char *host); +typedef void *(*grpc_server_register_method_type)(grpc_server *server, const char *method, const char *host, uint32_t flags); extern grpc_server_register_method_type grpc_server_register_method_import; #define grpc_server_register_method grpc_server_register_method_import typedef grpc_call_error(*grpc_server_request_registered_call_type)(grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); |