diff options
author | Craig Tiller <ctiller@google.com> | 2017-05-18 10:22:43 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-05-18 10:22:43 -0700 |
commit | d3ec4aaf6f52387d27d02ae80b18bdcaf65828d6 (patch) | |
tree | 1e4f02db3d023c799a45698551b47c8e41d4d6e0 /src | |
parent | c7280c7bdbae7fb021f979c9216e1bc5e13fe5ac (diff) | |
parent | 656bbbef4152065e498ec3d5cf43bd08820da9b3 (diff) |
Merge github.com:grpc/grpc into thread_pool
Diffstat (limited to 'src')
23 files changed, 338 insertions, 183 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 28c18f1d58..a7002e9b04 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -921,8 +921,12 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { - grpc_error *failure = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1); + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); gpr_atm_no_barrier_store(&calld->subchannel_call_or_error, 1 | (gpr_atm)GRPC_ERROR_REF(failure)); fail_locked(exec_ctx, calld, failure); @@ -1186,6 +1190,15 @@ static void start_transport_stream_op_batch_locked_inner( &calld->next_step)) { calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + if (calld->connected_subchannel == NULL) { + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy"); + gpr_atm_no_barrier_store(&calld->subchannel_call_or_error, + 1 | (gpr_atm)error); + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; // Early out. + } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 184b2ef720..fb4aa084a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -149,7 +149,9 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Finds an appropriate subchannel for a call, based on \a pick_args. - \a target will be set to the selected subchannel, or NULL on failure. + \a target will be set to the selected subchannel, or NULL on failure + or when the LB policy decides to drop the call. + Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. \a context will be populated with context to pass to the subchannel diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index f2651e652c..c3acc3d9db 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -327,6 +327,11 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** Index into serverlist for next pick. + * If the server at this index is a drop, we return a drop. + * Otherwise, we delegate to the RR policy. */ + size_t serverlist_index; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -402,6 +407,9 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + return false; + } const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -411,7 +419,6 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, } return false; } - if (ip->size != 4 && ip->size != 16) { if (log) { gpr_log(GPR_ERROR, @@ -445,11 +452,12 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { + memset(addr, 0, sizeof(*addr)); + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - memset(addr, 0, sizeof(*addr)); if (ip->size == 4) { addr->len = sizeof(struct sockaddr_in); struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; @@ -586,16 +594,51 @@ static bool update_lb_connectivity_status_locked( return true; } -/* perform a pick over \a rr_policy. Given that a pick can return immediately - * (ignoring its completion callback) we need to perform the cleanups this - * callback would be otherwise resposible for */ +/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return + * immediately (ignoring its completion callback), we need to perform the + * cleanups this callback would otherwise be resposible for. + * If \a force_async is true, then we will manually schedule the + * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, - const grpc_lb_policy_pick_args *pick_args, + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - GPR_ASSERT(rr_policy != NULL); + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_finished( + server->drop_for_rate_limiting, server->drop_for_load_balancing, + false /* failed_to_send */, false /* known_received */, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } + gpr_free(wc_arg->free_when_done); + return true; + } + // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, wc_arg->context, + exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ @@ -604,17 +647,20 @@ static bool pick_from_internal_rr_locked( (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - // Pass on client stats via context. Passes ownership of the reference. GPR_ASSERT(wc_arg->client_stats != NULL); wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; - + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the @@ -744,8 +790,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - &pp->pick_args, pp->target, + pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -1115,8 +1161,9 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - pick_args, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, + false /* force_async */, target, wc_arg); } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, @@ -1517,7 +1564,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, * serverlist instance will be destroyed either upon the next * update or in glb_destroy() */ glb_policy->serverlist = serverlist; - + glb_policy->serverlist_index = 0; rr_handover_locked(exec_ctx, glb_policy); } } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 81b6932fae..90e7c2efe5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -37,44 +37,39 @@ #include <grpc/support/alloc.h> +/* invoked once for every Server in ServerList */ +static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field, + void **arg) { + grpc_grpclb_serverlist *sl = *arg; + grpc_grpclb_server server; + if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + ++sl->num_servers; + return true; +} + typedef struct decode_serverlist_arg { - /* The first pass counts the number of servers in the server list. The second - * one allocates and decodes. */ - bool first_pass; /* The decoding callback is invoked once per server in serverlist. Remember * which index of the serverlist are we currently decoding */ size_t decoding_idx; - /* Populated after the first pass. Number of server in the input serverlist */ - size_t num_servers; /* The decoded serverlist */ - grpc_grpclb_server **servers; + grpc_grpclb_serverlist *serverlist; } decode_serverlist_arg; /* invoked once for every Server in ServerList */ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { decode_serverlist_arg *dec_arg = *arg; - if (dec_arg->first_pass) { /* count how many server do we have */ - grpc_grpclb_server server; - if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->num_servers++; - } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); - GPR_ASSERT(dec_arg->num_servers > 0); - if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ - dec_arg->servers = - gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); - } - if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->servers[dec_arg->decoding_idx++] = server; + GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); + if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { + gpr_free(server); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; } - + dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; return true; } @@ -165,36 +160,38 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( grpc_slice encoded_grpc_grpclb_response) { - bool status; - decode_serverlist_arg arg; pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); - memset(&arg, 0, sizeof(decode_serverlist_arg)); - - res.server_list.servers.funcs.decode = decode_serverlist; - res.server_list.servers.arg = &arg; - arg.first_pass = true; - status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); + // First pass: count number of servers. + res.server_list.servers.funcs.decode = count_serverlist; + res.server_list.servers.arg = sl; + bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_free(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } - - arg.first_pass = false; - status = - pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); - if (!status) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); - return NULL; + // Second pass: populate servers. + if (sl->num_servers > 0) { + sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers); + decode_serverlist_arg decode_arg; + memset(&decode_arg, 0, sizeof(decode_arg)); + decode_arg.serverlist = sl; + res.server_list.servers.funcs.decode = decode_serverlist; + res.server_list.servers.arg = &decode_arg; + status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, + &res); + if (!status) { + grpc_grpclb_destroy_serverlist(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return NULL; + } } - - grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); - sl->num_servers = arg.num_servers; - sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { sl->expiration_interval = res.server_list.expiration_interval; } @@ -228,7 +225,7 @@ grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, const grpc_grpclb_serverlist *rhs) { - if ((lhs == NULL) || (rhs == NULL)) { + if (lhs == NULL || rhs == NULL) { return false; } if (lhs->num_servers != rhs->num_servers) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 06873821bd..7f596ce1f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -51,7 +51,7 @@ typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; typedef grpc_lb_v1_Duration grpc_grpclb_duration; -typedef struct grpc_grpclb_serverlist { +typedef struct { grpc_grpclb_server **servers; size_t num_servers; grpc_grpclb_duration expiration_interval; diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index e9246948f5..d446e5312a 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -56,6 +56,8 @@ struct grpc_tcp_listener { int port; /* linked list */ struct grpc_tcp_listener *next; + + bool closed; }; struct grpc_tcp_server { @@ -77,6 +79,8 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; + bool shutdown; + grpc_resource_quota *resource_quota; }; @@ -109,6 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->shutdown_starting.head = NULL; s->shutdown_starting.tail = NULL; s->shutdown_complete = shutdown_complete; + s->shutdown = false; *server = s; return GRPC_ERROR_NONE; } @@ -125,6 +130,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + GPR_ASSERT(s->shutdown); if (s->shutdown_complete != NULL) { grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } @@ -144,21 +150,31 @@ static void handle_close_callback(uv_handle_t *handle) { grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; sp->server->open_ports--; - if (sp->server->open_ports == 0) { + if (sp->server->open_ports == 0 && sp->server->shutdown) { finish_shutdown(&exec_ctx, sp->server); } grpc_exec_ctx_finish(&exec_ctx); } +static void close_listener(grpc_tcp_listener *sp) { + if (!sp->closed) { + sp->closed = true; + uv_close((uv_handle_t *)sp->handle, handle_close_callback); + } +} + static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { int immediately_done = 0; grpc_tcp_listener *sp; + GPR_ASSERT(!s->shutdown); + s->shutdown = true; + if (s->open_ports == 0) { immediately_done = 1; } for (sp = s->head; sp; sp = sp->next) { - uv_close((uv_handle_t *)sp->handle, handle_close_callback); + close_listener(sp); } if (immediately_done) { @@ -196,9 +212,14 @@ static void on_connect(uv_stream_t *server, int status) { int err; if (status < 0) { - gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", - uv_strerror(status)); - return; + switch (status) { + case UV_EINTR: + case UV_EAGAIN: + return; + default: + close_listener(sp); + return; + } } client = gpr_malloc(sizeof(uv_tcp_t)); @@ -287,6 +308,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, sp->handle = handle; sp->port = port; sp->port_index = port_index; + sp->closed = false; handle->data = sp; s->open_ports++; GPR_ASSERT(sp->handle); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index e7157537f6..dc23e4f521 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -88,12 +88,12 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { #ifdef GRPC_TCP_REFCOUNT_DEBUG #define TCP_UNREF(exec_ctx, tcp, reason) \ tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) -#define TCP_REF(tcp, reason) \ - tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, - reason, tcp->refcount.count, tcp->refcount.count - 1); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP unref %p : %s %" PRIiPTR " -> %" PRIiPTR, tcp, reason, + tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { tcp_free(exec_ctx, tcp); } @@ -101,8 +101,9 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, - reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP ref %p : %s %" PRIiPTR " -> %" PRIiPTR, tcp, reason, + tcp->refcount.count, tcp->refcount.count + 1); gpr_ref(&tcp->refcount); } #else @@ -311,6 +312,7 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } GRPC_ERROR_UNREF(why); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b0a4b1fbcc..de905941c1 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -582,9 +582,9 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - int shutdown = gpr_unref(&cqd->pending_events); - gpr_mu_lock(cqd->mu); + + int shutdown = gpr_unref(&cqd->pending_events); if (!shutdown) { grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); gpr_mu_unlock(cqd->mu); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 2ead048a1f..6dca6a6862 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -358,4 +358,14 @@ void ServerBuilder::InternalAddPluginFactory( (*g_plugin_factory_list).push_back(CreatePlugin); } +ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) { + switch (id) { + case GRPC_WORKAROUND_ID_CRONET_COMPRESSION: + return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1); + default: + gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id); + return *this; + } +} + } // namespace grpc diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3c368fbc6c..8ed0c0b92f 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -117,7 +117,7 @@ namespace Grpc.Core.Internal { var ctx = BatchContextSafeHandle.Create(); completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); + Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } @@ -140,7 +140,7 @@ namespace Grpc.Core.Internal var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); - Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata, + Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index a98861af61..696987d2a8 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -346,11 +346,11 @@ namespace Grpc.Core.Internal public delegate CallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, CallFlags metadataFlags); public delegate CallError grpcsharp_call_send_message_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, int sendEmptyInitialMetadata); public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, byte[] statusMessage, UIntPtr statusMessageLen, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + BatchContextSafeHandle ctx, StatusCode statusCode, byte[] statusMessage, UIntPtr statusMessageLen, MetadataArraySafeHandle metadataArray, int sendEmptyInitialMetadata, byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); @@ -406,7 +406,7 @@ namespace Grpc.Core.Internal public delegate CallCredentialsSafeHandle grpcsharp_metadata_credentials_create_from_plugin_delegate(NativeMetadataInterceptor interceptor); public delegate void grpcsharp_metadata_credentials_notify_from_plugin_delegate(IntPtr callbackPtr, IntPtr userData, MetadataArraySafeHandle metadataArray, StatusCode statusCode, string errorDetails); - public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth); + public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, int forceClientAuth); public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials); public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args); diff --git a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs index 24f686fddc..c14fa7c827 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs @@ -53,7 +53,7 @@ namespace Grpc.Core.Internal return Native.grpcsharp_ssl_server_credentials_create(pemRootCerts, keyCertPairCertChainArray, keyCertPairPrivateKeyArray, new UIntPtr((ulong)keyCertPairCertChainArray.Length), - forceClientAuth); + forceClientAuth ? 1 : 0); } protected override bool ReleaseHandle() diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index aa8a8d3b17..d823942be5 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -51,11 +51,11 @@ powershell -Command "cp -r ..\..\platform=*\artifacts\protoc_* protoc_plugins" @rem To be able to build, we also need to put grpc_csharp_ext to its normal location xcopy /Y /I nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\ -%DOTNET% pack --configuration Release Grpc.Core --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release Grpc.Core.Testing --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release Grpc.Auth --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release Grpc.HealthCheck --output ..\..\..\artifacts || goto :error -%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Auth --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ..\..\..\artifacts || goto :error +%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ..\..\..\artifacts || goto :error %NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error %NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index d33923845c..f79c97fbbc 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -48,11 +48,11 @@ dotnet restore Grpc.sln mkdir -p ../../libs/opt cp nativelibs/csharp_ext_linux_x64/libgrpc_csharp_ext.so ../../libs/opt -dotnet pack --configuration Release Grpc.Core --output ../../../artifacts -dotnet pack --configuration Release Grpc.Core.Testing --output ../../../artifacts -dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts -dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts -dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts +dotnet pack --configuration Release --include-symbols --include-source Grpc.Core --output ../../../artifacts +dotnet pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ../../../artifacts +dotnet pack --configuration Release --include-symbols --include-source Grpc.Auth --output ../../../artifacts +dotnet pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ../../../artifacts +dotnet pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ../../../artifacts nuget pack Grpc.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts nuget pack Grpc.Tools.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts diff --git a/src/node/index.js b/src/node/index.js index 0da3440eb7..2da77c3eae 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -70,8 +70,6 @@ grpc.setDefaultRootsPem(fs.readFileSync(SSL_ROOTS_PATH, 'ascii')); * Buffers. Defaults to false * - longsAsStrings: deserialize long values as strings instead of objects. * Defaults to true - * - enumsAsStrings: deserialize enum values as strings instead of numbers. - * Defaults to true * - deprecatedArgumentOrder: Use the beta method argument order for client * methods, with optional arguments after the callback. Defaults to false. * This option is only a temporary stopgap measure to smooth an API breakage. @@ -105,10 +103,6 @@ exports.loadObject = function loadObject(value, options) { switch (protobufjsVersion) { case 6: return protobuf_js_6_common.loadObject(value, options); case 5: - var deprecation_message = 'Calling grpc.loadObject with an object ' + - 'generated by ProtoBuf.js 5 is deprecated. Please upgrade to ' + - 'ProtoBuf.js 6.'; - common.log(grpc.logVerbosity.INFO, deprecation_message); return protobuf_js_5_common.loadObject(value, options); default: throw new Error('Unrecognized protobufjsVersion', protobufjsVersion); @@ -117,19 +111,6 @@ exports.loadObject = function loadObject(value, options) { var loadObject = exports.loadObject; -function applyProtoRoot(filename, root) { - if (_.isString(filename)) { - return filename; - } - filename.root = path.resolve(filename.root) + '/'; - root.resolvePath = function(originPath, importPath, alreadyNormalized) { - return ProtoBuf.util.path.resolve(filename.root, - importPath, - alreadyNormalized); - }; - return filename.file; -} - /** * Load a gRPC object from a .proto file. The options object can provide the * following options: @@ -139,8 +120,6 @@ function applyProtoRoot(filename, root) { * Buffers. Defaults to false * - longsAsStrings: deserialize long values as strings instead of objects. * Defaults to true - * - enumsAsStrings: deserialize enum values as strings instead of numbers. - * Defaults to true * - deprecatedArgumentOrder: Use the beta method argument order for client * methods, with optional arguments after the callback. Defaults to false. * This option is only a temporary stopgap measure to smooth an API breakage. @@ -152,17 +131,31 @@ function applyProtoRoot(filename, root) { * @return {Object<string, *>} The resulting gRPC object */ exports.load = function load(filename, format, options) { - /* Note: format is currently unused, because the API for loading a proto - file or a JSON file is identical in Protobuf.js 6. In the future, there is - still the possibility of adding other formats that would be loaded - differently */ options = _.defaults(options, common.defaultGrpcOptions); - options.protobufjs_version = 6; - var root = new ProtoBuf.Root(); - var parse_options = {keepCase: !options.convertFieldsToCamelCase}; - return loadObject(root.loadSync(applyProtoRoot(filename, root), - parse_options), - options); + options.protobufjsVersion = 5; + if (!format) { + format = 'proto'; + } + var convertFieldsToCamelCaseOriginal = ProtoBuf.convertFieldsToCamelCase; + if(options && options.hasOwnProperty('convertFieldsToCamelCase')) { + ProtoBuf.convertFieldsToCamelCase = options.convertFieldsToCamelCase; + } + var builder; + try { + switch(format) { + case 'proto': + builder = ProtoBuf.loadProtoFile(filename); + break; + case 'json': + builder = ProtoBuf.loadJsonFile(filename); + break; + default: + throw new Error('Unrecognized format "' + format + '"'); + } + } finally { + ProtoBuf.convertFieldsToCamelCase = convertFieldsToCamelCaseOriginal; + } + return loadObject(builder.ns, options); }; var log_template = _.template( diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js index 62cf2f4aca..4041e05390 100644 --- a/src/node/src/protobuf_js_5_common.js +++ b/src/node/src/protobuf_js_5_common.js @@ -45,8 +45,7 @@ var client = require('./client'); * objects. Defaults to true * @return {function(Buffer):cls} The deserialization function */ -exports.deserializeCls = function deserializeCls(cls, binaryAsBase64, - longsAsStrings) { +exports.deserializeCls = function deserializeCls(cls, options) { /** * Deserialize a buffer to a message object * @param {Buffer} arg_buf The buffer to deserialize @@ -55,7 +54,8 @@ exports.deserializeCls = function deserializeCls(cls, binaryAsBase64, return function deserialize(arg_buf) { // Convert to a native object with binary fields as Buffers (first argument) // and longs as strings (second argument) - return cls.decode(arg_buf).toRaw(binaryAsBase64, longsAsStrings); + return cls.decode(arg_buf).toRaw(options.binaryAsBase64, + options.longsAsStrings); }; }; @@ -128,10 +128,10 @@ exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, responseType: method.resolvedResponseType, requestSerialize: serializeCls(method.resolvedRequestType.build()), requestDeserialize: deserializeCls(method.resolvedRequestType.build(), - binaryAsBase64, longsAsStrings), + options), responseSerialize: serializeCls(method.resolvedResponseType.build()), responseDeserialize: deserializeCls(method.resolvedResponseType.build(), - binaryAsBase64, longsAsStrings) + options) }; })); }; diff --git a/src/node/src/server.js b/src/node/src/server.js index 08417a74c1..1d9cc7d2c1 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -781,6 +781,11 @@ Server.prototype.addService = function(service, implementation) { }); }; +var logAddProtoServiceDeprecationOnce = _.once(function() { + common.log(constants.logVerbosity.INFO, + 'Server#addProtoService is deprecated. Use addService instead'); +}); + /** * Add a proto service to the server, with a corresponding implementation * @deprecated Use grpc.load and Server#addService instead @@ -792,8 +797,7 @@ Server.prototype.addProtoService = function(service, implementation) { var options; var protobuf_js_5_common = require('./protobuf_js_5_common'); var protobuf_js_6_common = require('./protobuf_js_6_common'); - common.log(constants.logVerbosity.INFO, - 'Server#addProtoService is deprecated. Use addService instead'); + logAddProtoServiceDeprecationOnce(); if (protobuf_js_5_common.isProbablyProtobufJs5(service)) { options = _.defaults(service.grpc_options, common.defaultGrpcOptions); this.addService( diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js index e1ce864f97..b7c2c6a8d6 100644 --- a/src/node/test/common_test.js +++ b/src/node/test/common_test.js @@ -37,16 +37,15 @@ var assert = require('assert'); var _ = require('lodash'); var common = require('../src/common'); -var protobuf_js_6_common = require('../src/protobuf_js_6_common'); +var protobuf_js_5_common = require('../src/protobuf_js_5_common'); -var serializeCls = protobuf_js_6_common.serializeCls; -var deserializeCls = protobuf_js_6_common.deserializeCls; +var serializeCls = protobuf_js_5_common.serializeCls; +var deserializeCls = protobuf_js_5_common.deserializeCls; var ProtoBuf = require('protobufjs'); -var messages_proto = new ProtoBuf.Root(); -messages_proto = messages_proto.loadSync( - __dirname + '/test_messages.proto', {keepCase: true}).resolveAll(); +var messages_proto = ProtoBuf.loadProtoFile( + __dirname + '/test_messages.proto').build(); var default_options = common.defaultGrpcOptions; @@ -101,6 +100,7 @@ describe('Proto message long int serialize and deserialize', function() { var longNumDeserialize = deserializeCls(messages_proto.LongValues, num_options); var serialized = longSerialize({int_64: pos_value}); + console.log(longDeserialize(serialized)); assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string'); /* With the longsAsStrings option disabled, long values are represented as * objects with 3 keys: low, high, and unsigned */ @@ -136,7 +136,8 @@ describe('Proto message bytes serialize and deserialize', function() { var serialized = sequenceSerialize({repeated_field: [10]}); assert.strictEqual(expected_serialize.compare(serialized), 0); }); - it('should deserialize packed or unpacked repeated', function() { + // This tests a bug that was fixed in Protobuf.js 6 + it.skip('should deserialize packed or unpacked repeated', function() { var expectedDeserialize = { bytes_field: new Buffer(''), repeated_field: [10] @@ -155,7 +156,8 @@ describe('Proto message bytes serialize and deserialize', function() { assert.deepEqual(unpackedDeserialized, expectedDeserialize); }); }); -describe('Proto message oneof serialize and deserialize', function() { +// This tests a bug that was fixed in Protobuf.js 6 +describe.skip('Proto message oneof serialize and deserialize', function() { var oneofSerialize = serializeCls(messages_proto.OneOfValues); var oneofDeserialize = deserializeCls( messages_proto.OneOfValues, default_options); @@ -193,7 +195,8 @@ describe('Proto message enum serialize and deserialize', function() { assert.deepEqual(enumDeserialize(nameSerialized), enumDeserialize(numberSerialized)); }); - it('Should deserialize as a string the enumsAsStrings option', function() { + // This tests a bug that was fixed in Protobuf.js 6 + it.skip('Should correctly handle the enumsAsStrings option', function() { var serialized = enumSerialize({enum_value: 'TWO'}); var nameDeserialized = enumDeserialize(serialized); var numberDeserialized = enumIntDeserialize(serialized); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 783028fa99..d2f0511af2 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -43,9 +43,8 @@ var ProtoBuf = require('protobufjs'); var grpc = require('..'); -var math_proto = new ProtoBuf.Root(); -math_proto = math_proto.loadSync(__dirname + - '/../../proto/math/math.proto', {keepCase: true}); +var math_proto = ProtoBuf.loadProtoFile(__dirname + + '/../../proto/math/math.proto'); var mathService = math_proto.lookup('math.Math'); var mathServiceAttrs = grpc.loadObject( @@ -332,9 +331,7 @@ describe('Echo service', function() { var server; var client; before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/echo_service.proto', - {keepCase: true}); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto'); var echo_service = test_proto.lookup('EchoService'); var Client = grpc.loadObject(echo_service); server = new grpc.Server(); @@ -357,6 +354,13 @@ describe('Echo service', function() { done(); }); }); + it('Should convert an undefined argument to default values', function(done) { + client.echo(undefined, function(error, response) { + assert.ifError(error); + assert.deepEqual(response, {value: '', value2: 0}); + done(); + }); + }); }); describe('Generic client and server', function() { function toString(val) { @@ -457,9 +461,7 @@ describe('Echo metadata', function() { var server; var metadata; before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/test_service.proto', - {keepCase: true}); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); var Client = grpc.loadObject(test_service); server = new grpc.Server(); @@ -560,9 +562,7 @@ describe('Client malformed response handling', function() { var client; var badArg = new Buffer([0xFF]); before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/test_service.proto', - {keepCase: true}); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); var malformed_test_service = { unary: { @@ -669,9 +669,7 @@ describe('Server serialization failure handling', function() { var client; var server; before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/test_service.proto', - {keepCase: true}); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); var malformed_test_service = { unary: { @@ -772,16 +770,13 @@ describe('Server serialization failure handling', function() { }); }); describe('Other conditions', function() { - var test_service; var Client; var client; var server; var port; before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/test_service.proto', - {keepCase: true}); - test_service = test_proto.lookup('TestService'); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); Client = grpc.loadObject(test_service); server = new grpc.Server(); var trailer_metadata = new grpc.Metadata(); @@ -1121,15 +1116,12 @@ describe('Call propagation', function() { var proxy; var proxy_impl; - var test_service; var Client; var client; var server; before(function() { - var test_proto = new ProtoBuf.Root(); - test_proto = test_proto.loadSync(__dirname + '/test_service.proto', - {keepCase: true}); - test_service = test_proto.lookup('TestService'); + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); server = new grpc.Server(); Client = grpc.loadObject(test_service); server.addService(Client.service, { diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index d3fd88416b..94e0b73a49 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -125,7 +125,12 @@ zval *grpc_parse_metadata_array(grpc_metadata_array php_grpc_add_next_index_stringl(inner_array, str_val, GRPC_SLICE_LENGTH(elem->value), false); add_assoc_zval(array, str_key, inner_array); + PHP_GRPC_FREE_STD_ZVAL(inner_array); } + efree(str_key); +#if PHP_MAJOR_VERSION >= 7 + efree(str_val); +#endif } return array; } @@ -256,8 +261,6 @@ PHP_METHOD(Call, startBatch) { object_init(result); php_grpc_ulong index; zval *recv_status; - PHP_GRPC_MAKE_STD_ZVAL(recv_status); - object_init(recv_status); zval *value; zval *inner_value; zval *message_value; @@ -439,7 +442,7 @@ PHP_METHOD(Call, startBatch) { grpc_completion_queue_pluck(completion_queue, call->wrapped, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); #if PHP_MAJOR_VERSION >= 7 - zval recv_md; + zval *recv_md; #endif for (int i = 0; i < op_num; i++) { switch(ops[i].op) { @@ -460,8 +463,10 @@ PHP_METHOD(Call, startBatch) { array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC); add_property_zval(result, "metadata", array); #else - recv_md = *grpc_parse_metadata_array(&recv_metadata); - add_property_zval(result, "metadata", &recv_md); + recv_md = grpc_parse_metadata_array(&recv_metadata); + add_property_zval(result, "metadata", recv_md); + zval_ptr_dtor(recv_md); + PHP_GRPC_FREE_STD_ZVAL(recv_md); #endif PHP_GRPC_DELREF(array); break; @@ -475,12 +480,16 @@ PHP_METHOD(Call, startBatch) { } break; case GRPC_OP_RECV_STATUS_ON_CLIENT: + PHP_GRPC_MAKE_STD_ZVAL(recv_status); + object_init(recv_status); #if PHP_MAJOR_VERSION < 7 array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC); add_property_zval(recv_status, "metadata", array); #else - recv_md = *grpc_parse_metadata_array(&recv_trailing_metadata); - add_property_zval(recv_status, "metadata", &recv_md); + recv_md = grpc_parse_metadata_array(&recv_trailing_metadata); + add_property_zval(recv_status, "metadata", recv_md); + zval_ptr_dtor(recv_md); + PHP_GRPC_FREE_STD_ZVAL(recv_md); #endif PHP_GRPC_DELREF(array); add_property_long(recv_status, "code", status); @@ -489,6 +498,9 @@ PHP_METHOD(Call, startBatch) { true); gpr_free(status_details_text); add_property_zval(result, "status", recv_status); +#if PHP_MAJOR_VERSION >= 7 + zval_ptr_dtor(recv_status); +#endif PHP_GRPC_DELREF(recv_status); PHP_GRPC_FREE_STD_ZVAL(recv_status); break; diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index 625c0c62ae..da7100f24a 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -172,34 +172,54 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, object_init(arg); php_grpc_add_property_string(arg, "service_url", context.service_url, true); php_grpc_add_property_string(arg, "method_name", context.method_name, true); - zval *retval; - PHP_GRPC_MAKE_STD_ZVAL(retval); + zval *retval = NULL; #if PHP_MAJOR_VERSION < 7 zval **params[1]; params[0] = &arg; state->fci->params = params; state->fci->retval_ptr_ptr = &retval; #else + PHP_GRPC_MAKE_STD_ZVAL(retval); state->fci->params = arg; state->fci->retval = retval; #endif state->fci->param_count = 1; + PHP_GRPC_DELREF(arg); + /* call the user callback function */ zend_call_function(state->fci, state->fci_cache TSRMLS_CC); grpc_status_code code = GRPC_STATUS_OK; grpc_metadata_array metadata; + bool cleanup = true; if (Z_TYPE_P(retval) != IS_ARRAY) { + cleanup = false; code = GRPC_STATUS_INVALID_ARGUMENT; } else if (!create_metadata_array(retval, &metadata)) { - grpc_metadata_array_destroy(&metadata); code = GRPC_STATUS_INVALID_ARGUMENT; } + if (retval != NULL) { +#if PHP_MAJOR_VERSION < 7 + zval_ptr_dtor(&retval); +#else + zval_ptr_dtor(arg); + zval_ptr_dtor(retval); + PHP_GRPC_FREE_STD_ZVAL(arg); + PHP_GRPC_FREE_STD_ZVAL(retval); +#endif + } + /* Pass control back to core */ cb(user_data, metadata.metadata, metadata.count, code, NULL); + if (cleanup) { + for (int i = 0; i < metadata.count; i++) { + grpc_slice_unref(metadata.metadata[i].value); + } + grpc_metadata_array_destroy(&metadata); + } } /* Cleanup function for plugin creds API */ @@ -207,8 +227,10 @@ void plugin_destroy_state(void *ptr) { plugin_state *state = (plugin_state *)ptr; efree(state->fci); efree(state->fci_cache); +#if PHP_MAJOR_VERSION < 7 PHP_GRPC_FREE_STD_ZVAL(state->fci->params); PHP_GRPC_FREE_STD_ZVAL(state->fci->retval); +#endif efree(state); } diff --git a/src/php/ext/grpc/php_grpc.h b/src/php/ext/grpc/php_grpc.h index 13083b0bc7..51f9dd5fe6 100644 --- a/src/php/ext/grpc/php_grpc.h +++ b/src/php/ext/grpc/php_grpc.h @@ -40,9 +40,6 @@ extern zend_module_entry grpc_module_entry; #define phpext_grpc_ptr &grpc_module_entry -#define PHP_GRPC_VERSION \ - "0.1.0" /* Replace with version number for your extension */ - #ifdef PHP_WIN32 #define PHP_GRPC_API __declspec(dllexport) #elif defined(__GNUC__) && __GNUC__ >= 4 @@ -56,10 +53,9 @@ extern zend_module_entry grpc_module_entry; #endif #include "php.h" - #include "php7_wrapper.h" - #include "grpc/grpc.h" +#include "version.h" /* These are all function declarations */ /* Code that runs at module initialization */ diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h new file mode 100644 index 0000000000..993ef2de27 --- /dev/null +++ b/src/php/ext/grpc/version.h @@ -0,0 +1,40 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + + +#ifndef VERSION_H +#define VERSION_H + +#define PHP_GRPC_VERSION "1.4.0" + +#endif /* VERSION_H */ |