diff options
Diffstat (limited to 'src')
87 files changed, 2801 insertions, 1165 deletions
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index 06a6012dee..1a6be91359 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -71,7 +71,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( * compression algorithms are enabled. It's an error to disable an algorithm set * by grpc_channel_args_set_compression_algorithm. * - * Returns an instance will the updated algorithm states. The \a a pointer is + * Returns an instance with the updated algorithm states. The \a a pointer is * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 136292bfce..f3d51f35cf 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -70,7 +70,7 @@ typedef struct channel_data { /* used to silence 'variable not used' warnings */ static void ignore_unused(void *ignored) {} -static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { +static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; if (md == channeld->status) { @@ -78,6 +78,8 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { } else if (md->key == channeld->status->key) { grpc_call_element_send_cancel(elem); return NULL; + } else if (md->key == channeld->content_type->key) { + return NULL; } return md; } @@ -92,11 +94,13 @@ static void hc_on_recv(void *user_data, int success) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; calld->got_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); + grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, elem); } calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } + + static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 0ed9fa9e36..d3c6012d88 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -46,6 +46,7 @@ typedef struct call_data { gpr_uint8 seen_te_trailers; gpr_uint8 seen_authority; grpc_linked_mdelem status; + grpc_linked_mdelem content_type; grpc_stream_op_buffer *recv_ops; /** Closure to call when finished with the hs_on_recv hook */ @@ -110,8 +111,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return NULL; } else if (md->key == channeld->te_trailers->key || md->key == channeld->method_post->key || - md->key == channeld->http_scheme->key || - md->key == channeld->content_type->key) { + md->key == channeld->http_scheme->key) { gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); /* swallow it and error everything out. */ @@ -202,6 +202,8 @@ static void hs_mutate_op(grpc_call_element *elem, calld->sent_status = 1; grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->status, GRPC_MDELEM_REF(channeld->status_ok)); + grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, + GRPC_MDELEM_REF(channeld->content_type)); break; } } diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index a8cd5fc149..c1e583e4a5 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -47,3 +47,7 @@ void grpc_connector_connect(grpc_connector *connector, grpc_iomgr_closure *notify) { connector->vtable->connect(connector, in_args, out_args, notify); } + +void grpc_connector_shutdown(grpc_connector *connector) { + connector->vtable->shutdown(connector); +} diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 39f3467990..01aa716412 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -70,6 +70,9 @@ typedef struct { struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_connector *connector); + /** Implementation of grpc_connector_shutdown */ + void (*shutdown)(grpc_connector *connector); + /** Implementation of grpc_connector_connect */ void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); @@ -77,9 +80,12 @@ struct grpc_connector_vtable { void grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector); +/** Connect using the connector: max one outstanding call at a time */ void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); +/** Cancel any pending connection */ +void grpc_connector_shutdown(grpc_connector *connector); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 5ae2e0ea52..c8262e92ef 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,6 +31,7 @@ * */ +#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" #include <string.h> @@ -314,19 +315,34 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_check_connectivity, pf_notify_on_state_change}; -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels) { +static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} + +static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(num_subchannels); + GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); - p->num_subchannels = num_subchannels; + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + p->num_subchannels = args->num_subchannels; grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); - memcpy(p->subchannels, subchannels, - sizeof(grpc_subchannel *) * num_subchannels); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; } + +static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first"}; + +static grpc_lb_policy_factory pick_first_lb_policy_factory = { + &pick_first_factory_vtable}; + +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { + return &pick_first_lb_policy_factory; +} diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 31394985e5..3ca53ad42a 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -34,11 +34,10 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H -#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/lb_policy_factory.h" -/** Returns a load balancing policy instance that picks up the first subchannel - * from \a subchannels to succesfully connect */ -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels); +/** Returns a load balancing factory for the pick first policy, which picks up + * the first subchannel from \a subchannels to succesfully connect */ +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); #endif diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c new file mode 100644 index 0000000000..39fb2bc401 --- /dev/null +++ b/src/core/client_config/lb_policies/round_robin.c @@ -0,0 +1,549 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policies/round_robin.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include "src/core/transport/connectivity_state.h" + +int grpc_lb_round_robin_trace = 0; + +/** List of entities waiting for a pick. + * + * Once a pick is available, \a target is updated and \a on_complete called. */ +typedef struct pending_pick { + struct pending_pick *next; + grpc_pollset *pollset; + grpc_subchannel **target; + grpc_iomgr_closure *on_complete; +} pending_pick; + +/** List of subchannels in a connectivity READY state */ +typedef struct ready_list { + grpc_subchannel *subchannel; + struct ready_list *next; + struct ready_list *prev; +} ready_list; + +typedef struct { + size_t subchannel_idx; /**< Index over p->subchannels */ + void *p; /**< round_robin_lb_policy instance */ +} connectivity_changed_cb_arg; + +typedef struct { + /** base policy: must be first */ + grpc_lb_policy base; + + /** all our subchannels */ + grpc_subchannel **subchannels; + size_t num_subchannels; + + /** Callbacks, one per subchannel being watched, to be called when their + * respective connectivity changes */ + grpc_iomgr_closure *connectivity_changed_cbs; + connectivity_changed_cb_arg *cb_args; + + /** mutex protecting remaining members */ + gpr_mu mu; + /** have we started picking? */ + int started_picking; + /** are we shutting down? */ + int shutdown; + /** Connectivity state of the subchannels being watched */ + grpc_connectivity_state *subchannel_connectivity; + /** List of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + /** (Dummy) root of the doubly linked list containing READY subchannels */ + ready_list ready_list; + /** Last pick from the ready list. */ + ready_list *ready_list_last_pick; + + /** Subchannel index to ready_list node. + * + * Kept in order to remove nodes from the ready list associated with a + * subchannel */ + ready_list **subchannel_index_to_readylist_node; +} round_robin_lb_policy; + +/** Returns the next subchannel from the connected list or NULL if the list is + * empty. + * + * Note that this function does *not* advance p->ready_list_last_pick. Use \a + * advance_last_picked_locked() for that. */ +static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { + ready_list *selected; + selected = p->ready_list_last_pick->next; + + while (selected != NULL) { + if (selected == &p->ready_list) { + GPR_ASSERT(selected->subchannel == NULL); + /* skip dummy root */ + selected = selected->next; + } else { + GPR_ASSERT(selected->subchannel != NULL); + return selected; + } + } + return NULL; +} + +/** Advance the \a ready_list picking head. */ +static void advance_last_picked_locked(round_robin_lb_policy *p) { + if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + if (p->ready_list_last_pick == &p->ready_list) { + /* skip dummy root */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + } + } else { /* should be an empty list */ + GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", + p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + } +} + +/** Prepends (relative to the root at p->ready_list) the connected subchannel \a + * csc to the list of ready subchannels. */ +static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, + grpc_subchannel *csc) { + ready_list *new_elem = gpr_malloc(sizeof(ready_list)); + new_elem->subchannel = csc; + if (p->ready_list.prev == NULL) { + /* first element */ + new_elem->next = &p->ready_list; + new_elem->prev = &p->ready_list; + p->ready_list.next = new_elem; + p->ready_list.prev = new_elem; + } else { + new_elem->next = &p->ready_list; + new_elem->prev = p->ready_list.prev; + p->ready_list.prev->next = new_elem; + p->ready_list.prev = new_elem; + } + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + } + return new_elem; +} + +/** Removes \a node from the list of connected subchannels */ +static void remove_disconnected_sc_locked(round_robin_lb_policy *p, + ready_list *node) { + if (node == NULL) { + return; + } + if (node == p->ready_list_last_pick) { + /* If removing the lastly picked node, reset the last pick pointer to the + * dummy root of the list */ + p->ready_list_last_pick = &p->ready_list; + } + + /* removing last item */ + if (node->next == &p->ready_list && node->prev == &p->ready_list) { + GPR_ASSERT(p->ready_list.next == node); + GPR_ASSERT(p->ready_list.prev == node); + p->ready_list.next = NULL; + p->ready_list.prev = NULL; + } else { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + node->subchannel); + } + + node->next = NULL; + node->prev = NULL; + node->subchannel = NULL; + + gpr_free(node); +} + +static void del_interested_parties_locked(round_robin_lb_policy *p, + const size_t subchannel_idx) { + pending_pick *pp; + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], + pp->pollset); + } +} + + +void rr_destroy(grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + size_t i; + ready_list *elem; + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } + for (i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin"); + } + gpr_free(p->connectivity_changed_cbs); + gpr_free(p->subchannel_connectivity); + + grpc_connectivity_state_destroy(&p->state_tracker); + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); + + elem = p->ready_list.next; + while (elem != NULL && elem != &p->ready_list) { + ready_list *tmp; + tmp = elem->next; + elem->next = NULL; + elem->prev = NULL; + elem->subchannel = NULL; + gpr_free(elem); + elem = tmp; + } + gpr_free(p->subchannel_index_to_readylist_node); + gpr_free(p->cb_args); + gpr_free(p); +} + +void rr_shutdown(grpc_lb_policy *pol) { + size_t i; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + gpr_mu_lock(&p->mu); + + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(p, i); + } + + p->shutdown = 1; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_iomgr_add_delayed_callback(pp->on_complete, 0); + gpr_free(pp); + } + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, + "shutdown"); + gpr_mu_unlock(&p->mu); +} + +static void start_picking(round_robin_lb_policy *p) { + size_t i; + p->started_picking = 1; + + for (i = 0; i < p->num_subchannels; i++) { + p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change(p->subchannels[i], + &p->subchannel_connectivity[i], + &p->connectivity_changed_cbs[i]); + GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); + } +} + +void rr_exit_idle(grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(p); + } + gpr_mu_unlock(&p->mu); +} + +void rr_pick(grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_iomgr_closure *on_complete) { + size_t i; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + ready_list *selected; + gpr_mu_lock(&p->mu); + if ((selected = peek_next_connected_locked(p))) { + gpr_mu_unlock(&p->mu); + *target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + /* only advance the last picked pointer if the selection was used */ + advance_last_picked_locked(p); + on_complete->cb(on_complete->cb_arg, 1); + } else { + if (!p->started_picking) { + start_picking(p); + } + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_add_interested_party(p->subchannels[i], pollset); + } + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock(&p->mu); + } +} + +static void rr_connectivity_changed(void *arg, int iomgr_success) { + connectivity_changed_cb_arg *cb_arg = arg; + round_robin_lb_policy *p = cb_arg->p; + /* index over p->subchannels of this cb's subchannel */ + const size_t this_idx = cb_arg->subchannel_idx; + pending_pick *pp; + ready_list *selected; + + int unref = 0; + + /* connectivity state of this cb's subchannel */ + grpc_connectivity_state *this_connectivity; + + gpr_mu_lock(&p->mu); + + this_connectivity = + &p->subchannel_connectivity[this_idx]; + + if (p->shutdown) { + unref = 1; + } else { + switch (*this_connectivity) { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + "connecting_ready"); + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + p->subchannel_index_to_readylist_node[this_idx] = + add_connected_sc_locked(p, p->subchannels[this_idx]); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + selected = peek_next_connected_locked(p); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + grpc_subchannel_del_interested_party(selected->subchannel, + pp->pollset); + grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + gpr_free(pp); + } + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set(&p->state_tracker, *this_connectivity, + "connecting_changed"); + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked(p, this_idx); + /* renew state notification */ + grpc_subchannel_notify_on_state_change( + p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + + /* remove from ready list if still present */ + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(p, this_idx); + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + + GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], + p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], + "round_robin"); + + if (p->num_subchannels == 0) { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels"); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + gpr_free(pp); + } + unref = 1; + } else { + grpc_connectivity_state_set(&p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed"); + } + } /* switch */ + } /* !unref */ + + gpr_mu_unlock(&p->mu); + + if (unref) { + GRPC_LB_POLICY_UNREF(&p->base, "round_robin_connectivity"); + } +} + +static void rr_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + size_t i; + size_t n; + grpc_subchannel **subchannels; + + gpr_mu_lock(&p->mu); + n = p->num_subchannels; + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); + } + gpr_mu_unlock(&p->mu); + + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(subchannels[i], op); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "rr_broadcast"); + } + gpr_free(subchannels); +} + +static grpc_connectivity_state rr_check_connectivity(grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); + return st; +} + +static void rr_notify_on_state_change(grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_iomgr_closure *notify) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, + notify); + gpr_mu_unlock(&p->mu); +} + +static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { + rr_destroy, + rr_shutdown, + rr_pick, + rr_exit_idle, + rr_broadcast, + rr_check_connectivity, + rr_notify_on_state_change}; + + +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} + +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + size_t i; + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); + GPR_ASSERT(args->num_subchannels > 0); + memset(p, 0, sizeof(*p)); + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + p->num_subchannels = args->num_subchannels; + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "round_robin"); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); + + gpr_mu_init(&p->mu); + p->connectivity_changed_cbs = + gpr_malloc(sizeof(grpc_iomgr_closure) * args->num_subchannels); + p->subchannel_connectivity = + gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); + + p->cb_args = + gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); + for(i = 0; i < args->num_subchannels; i++) { + p->cb_args[i].subchannel_idx = i; + p->cb_args[i].p = p; + grpc_iomgr_closure_init(&p->connectivity_changed_cbs[i], + rr_connectivity_changed, &p->cb_args[i]); + } + + /* The (dummy node) root of the ready list */ + p->ready_list.subchannel = NULL; + p->ready_list.prev = NULL; + p->ready_list.next = NULL; + p->ready_list_last_pick = &p->ready_list; + + p->subchannel_index_to_readylist_node = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + memset(p->subchannel_index_to_readylist_node, 0, + sizeof(grpc_subchannel *) * args->num_subchannels); + return &p->base; +} + +static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { + round_robin_factory_ref, round_robin_factory_unref, create_round_robin, + "round_robin"}; + +static grpc_lb_policy_factory round_robin_lb_policy_factory = { + &round_robin_factory_vtable}; + +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { + return &round_robin_lb_policy_factory; +} diff --git a/src/core/client_config/lb_policies/round_robin.h b/src/core/client_config/lb_policies/round_robin.h new file mode 100644 index 0000000000..2c81b9ef17 --- /dev/null +++ b/src/core/client_config/lb_policies/round_robin.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H + +#include "src/core/client_config/lb_policy.h" + +extern int grpc_lb_round_robin_trace; + +#include "src/core/client_config/lb_policy_factory.h" + +/** Returns a load balancing factory for the round robin policy */ +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); + + +#endif diff --git a/src/core/client_config/lb_policy_factory.c b/src/core/client_config/lb_policy_factory.c new file mode 100644 index 0000000000..0c097e0542 --- /dev/null +++ b/src/core/client_config/lb_policy_factory.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_factory.h" + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory) { + factory->vtable->ref(factory); +} +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory) { + factory->vtable->unref(factory); +} + +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (factory == NULL) return NULL; + return factory->vtable->create_lb_policy(factory, args); +} diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h new file mode 100644 index 0000000000..04610316ee --- /dev/null +++ b/src/core/client_config/lb_policy_factory.h @@ -0,0 +1,73 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H + +#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/subchannel.h" + +typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; +typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; + +/** grpc_lb_policy provides grpc_client_config objects to grpc_channel + objects */ +struct grpc_lb_policy_factory { + const grpc_lb_policy_factory_vtable *vtable; +}; + +typedef struct grpc_lb_policy_args { + grpc_subchannel **subchannels; + size_t num_subchannels; +} grpc_lb_policy_args; + +struct grpc_lb_policy_factory_vtable { + void (*ref)(grpc_lb_policy_factory *factory); + void (*unref)(grpc_lb_policy_factory *factory); + + /** Implementation of grpc_lb_policy_factory_create_lb_policy */ + grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); + + /** Name for the LB policy this factory implements */ + const char *name; +}; + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory); +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); + +/** Create a lb_policy instance. */ +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/client_config/lb_policy_registry.c b/src/core/client_config/lb_policy_registry.c new file mode 100644 index 0000000000..ae4a077ef3 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.c @@ -0,0 +1,88 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_registry.h" + +#include <string.h> + +#define MAX_POLICIES 10 + +static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; +static int g_number_of_lb_policies = 0; + +static grpc_lb_policy_factory *g_default_lb_policy_factory; + +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { + g_number_of_lb_policies = 0; + g_default_lb_policy_factory = default_factory; +} + +void grpc_lb_policy_registry_shutdown(void) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); + } +} + +void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + GPR_ASSERT(0 != strcmp(factory->vtable->name, + g_all_of_the_lb_policies[i]->vtable->name)); + } + GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); + grpc_lb_policy_factory_ref(factory); + g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; +} + +static grpc_lb_policy_factory *lookup_factory(const char* name) { + int i; + + if (name == NULL) return NULL; + + for (i = 0; i < g_number_of_lb_policies; i++) { + if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { + return g_all_of_the_lb_policies[i]; + } + } + + return NULL; +} + +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args) { + grpc_lb_policy_factory *factory = lookup_factory(name); + grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy( + factory, args); + return lb_policy; +} diff --git a/src/core/client_config/lb_policy_registry.h b/src/core/client_config/lb_policy_registry.h new file mode 100644 index 0000000000..96fc2a1628 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H + +#include "src/core/client_config/lb_policy_factory.h" + +/** Initialize the registry and set \a default_factory as the factory to be + * returned when no name is provided in a lookup */ +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_shutdown(void); + +/** Register a LB policy factory. */ +void grpc_register_lb_policy(grpc_lb_policy_factory *factory); + +/** Create a \a grpc_lb_policy instance. + * + * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init + * will be returned. */ +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 5124c4476a..ccec07a08c 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -39,7 +39,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -55,9 +55,8 @@ typedef struct { char *default_port; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -135,6 +134,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_lb_policy *lb_policy; size_t i; if (addresses) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { @@ -144,7 +144,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -193,13 +195,13 @@ static void dns_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); gpr_free(r->default_port); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *dns_create( grpc_uri *uri, const char *default_port, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + const char* lb_policy_name, grpc_subchannel_factory *subchannel_factory) { dns_resolver *r; const char *path = uri->path; @@ -220,7 +222,7 @@ static grpc_resolver *dns_create( r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; grpc_subchannel_factory_ref(subchannel_factory); - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -235,8 +237,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return dns_create(uri, "https", grpc_create_pick_first_lb_policy, - subchannel_factory); + return dns_create(uri, "https", "pick_first", subchannel_factory); } char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 85b1bf2849..ea2df07035 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -45,7 +45,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -56,9 +56,8 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** the addresses that we've 'resolved' */ struct sockaddr_storage *addrs; @@ -122,6 +121,7 @@ static void sockaddr_next(grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; + grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; @@ -136,7 +136,10 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, r->num_addrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = r->num_addrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); @@ -153,6 +156,7 @@ static void sockaddr_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->addrs); gpr_free(r->addrs_len); + gpr_free(r->lb_policy_name); gpr_free(r); } @@ -274,9 +278,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *default_lb_policy_name, grpc_subchannel_factory *subchannel_factory, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; @@ -293,6 +295,25 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); + r->lb_policy_name = NULL; + if (0 != strcmp(uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = gpr_slice_new(uri->query, strlen(uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); @@ -323,7 +344,6 @@ static grpc_resolver *sockaddr_create( gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; grpc_subchannel_factory_ref(subchannel_factory); return &r->base; @@ -341,7 +361,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_uri *uri, \ grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ + return sockaddr_create(uri, "pick_first", \ subchannel_factory, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 9a732b1417..2594e6fae9 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -41,7 +41,7 @@ #include <grpc/grpc_zookeeper.h> #include <zookeeper/zookeeper.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -59,9 +59,8 @@ typedef struct { char *name; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -183,6 +182,7 @@ static void zookeeper_on_resolved(void *arg, grpc_lb_policy *lb_policy; size_t i; if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { @@ -192,7 +192,10 @@ static void zookeeper_on_resolved(void *arg, subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -420,13 +423,12 @@ static void zookeeper_destroy(grpc_resolver *gr) { } grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *zookeeper_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *lb_policy_name, grpc_subchannel_factory *subchannel_factory) { zookeeper_resolver *r; size_t length; @@ -451,7 +453,7 @@ static grpc_resolver *zookeeper_create( r->name = gpr_strdup(path); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); /** Initializes zookeeper client */ @@ -490,8 +492,7 @@ static char *zookeeper_factory_get_default_hostname( static grpc_resolver *zookeeper_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return zookeeper_create(uri, grpc_create_pick_first_lb_policy, - subchannel_factory); + return zookeeper_create(uri, "pick_first", subchannel_factory); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index ca52c75beb..876d2aa418 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -439,6 +439,10 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, if (cancel_alarm) { grpc_alarm_cancel(&c->alarm); } + + if (op->disconnect) { + grpc_connector_shutdown(c->connector); + } } static void on_state_changed(void *p, int iomgr_success) { diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index 7887f9d530..86f34db1d0 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -35,7 +35,7 @@ #include <string.h> -#include "src/core/security/secure_transport_setup.h" +#include "src/core/security/handshake.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -58,20 +58,27 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static grpc_security_status httpcli_ssl_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void httpcli_ssl_do_handshake( + grpc_security_connector *sc, grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; tsi_result result = TSI_OK; - if (c->handshaker_factory == NULL) return GRPC_SECURITY_ERROR; + tsi_handshaker *handshaker; + if (c->handshaker_factory == NULL) { + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); + return; + } result = tsi_ssl_handshaker_factory_create_handshaker( - c->handshaker_factory, c->secure_peer_name, handshaker); + c->handshaker_factory, c->secure_peer_name, &handshaker); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", tsi_result_to_string(result)); - return GRPC_SECURITY_ERROR; + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data); } - return GRPC_SECURITY_OK; } static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, @@ -94,7 +101,7 @@ static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, } static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_create_handshaker, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer}; static grpc_security_status httpcli_ssl_channel_security_connector_create( const unsigned char *pem_root_certs, size_t pem_root_certs_size, @@ -169,8 +176,8 @@ static void ssl_handshake(void *arg, grpc_endpoint *tcp, const char *host, GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done, - c); + grpc_security_connector_do_handshake(&sc->base, tcp, + on_secure_transport_setup_done, c); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2d08a77a70..38a543e36e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -213,10 +213,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); - REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { - GPR_ASSERT(!fd->closed); fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 1dd03992ae..d6ca5d1f71 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -34,16 +34,18 @@ #include "src/core/iomgr/iomgr.h" #include <stdlib.h> +#include <string.h> -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/alarm_internal.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/support/string.h" + static gpr_mu g_mu; static gpr_cv g_rcv; static grpc_iomgr_closure *g_cbs_head = NULL; @@ -179,6 +181,8 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); + memset(&g_root_object, 0, sizeof(g_root_object)); + grpc_kick_poller(); gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future(GPR_CLOCK_REALTIME)); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8f62ce2954..481bdc4ede 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -72,7 +72,7 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { to this pollset whilst adding, but that should be benign. */ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); if (watcher.fd != NULL) { - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); ev.data.ptr = fd; err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); if (err < 0) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index dec2f5490f..f3e424e83c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -187,6 +187,12 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->shutting_down) { goto done; } + if (pollset->in_flight_cbs) { + /* Give do_promote priority so we don't starve it out */ + gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(&pollset->mu); + goto done; + } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; @@ -422,12 +428,6 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, int r; nfds_t nfds; - if (pollset->in_flight_cbs) { - /* Give do_promote priority so we don't starve it out */ - gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); - return; - } fd = pollset->data.ptr; if (fd && grpc_fd_is_orphaned(fd)) { GRPC_FD_UNREF(fd, "basicpoll"); diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c3668f6a92..07fa44ad37 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -54,6 +54,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +extern int grpc_tcp_trace; + typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; @@ -92,6 +94,10 @@ error: static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str, + success); + } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); @@ -116,6 +122,11 @@ static void on_writable(void *acp, int success) { void *cb_arg = ac->cb_arg; grpc_fd *fd; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", + ac->addr_str, success); + } + gpr_mu_lock(&ac->mu); GPR_ASSERT(ac->fd); fd = ac->fd; @@ -264,6 +275,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + ac->addr_str); + } + gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 05198dbff4..6f57de0289 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -77,7 +77,6 @@ static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); /* If the alarm didn't occur, it got cancelled. */ - gpr_log(GPR_DEBUG, "on_alarm: %p", ac->socket); if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } @@ -96,8 +95,6 @@ static void on_connect(void *acp, int from_iocp) { gpr_mu_lock(&ac->mu); - gpr_log(GPR_DEBUG, "on_connect: %p", ac->socket); - if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; @@ -124,7 +121,7 @@ static void on_connect(void *acp, int from_iocp) { notification request for the connection, and one timeout alert. */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, int addr_len, + const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; @@ -179,7 +176,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + success = ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get an IOCP notification, so let's ignore it. */ diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 5165f5c5ca..9303975781 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,7 +39,7 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; -/* New server callback: tcp is the newly connected tcp connection */ +/* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); /* Create a server, initially not bound to any ports */ @@ -47,8 +47,8 @@ grpc_tcp_server *grpc_tcp_server_create(void); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg); + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, void *cb_arg); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index bcbd0afe6b..f7b692a76b 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -98,8 +98,9 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -132,8 +133,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -339,6 +340,10 @@ static void on_read(void *arg, int success) { addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); + } + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every @@ -346,8 +351,8 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb( - sp->server->cb_arg, + sp->server->on_accept_cb( + sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); @@ -378,7 +383,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -484,16 +489,16 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { return (index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, size_t + pollset_count, grpc_tcp_server_cb on_accept_cb, void + *on_accept_cb_arg) { size_t i, j; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; s->pollsets = pollsets; s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index b513d854aa..c42e5e7527 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -71,8 +71,9 @@ typedef struct server_port { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -95,8 +96,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); s->active_ports = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -154,7 +155,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, /* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, - int addr_len) { + size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; @@ -167,7 +168,7 @@ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, goto error; } - if (bind(sock, addr, addr_len) == SOCKET_ERROR) { + if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) { char *addr_str; char *utf8_message = gpr_format_message(WSAGetLastError()); grpc_sockaddr_to_string(&addr_str, addr, 0); @@ -344,7 +345,7 @@ static void on_accept(void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->cb(sp->server->cb_arg, ep); + if (ep) sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next @@ -353,7 +354,7 @@ static void on_accept(void *arg, int from_iocp) { } static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, int addr_len) { + const struct sockaddr *addr, size_t addr_len) { server_port *sp; int port; int status; @@ -380,7 +381,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, port = prepare_socket(sock, addr, addr_len); if (port >= 0) { gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -400,7 +401,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, } int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - int addr_len) { + size_t addr_len) { int allocated_port = -1; unsigned i; SOCKET sock; @@ -462,15 +463,16 @@ SOCKET grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { } void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, + void *on_accept_cb_arg) { size_t i; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; for (i = 0; i < s->nports; i++) { start_accept(s->ports + i); s->active_ports++; diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index fe3673c607..725c18e6cc 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -144,7 +144,7 @@ static int on_read(grpc_tcp *tcp, int success) { int do_abort = 0; if (success) { - if (socket->read_info.wsa_error != 0) { + if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) { if (socket->read_info.wsa_error != WSAECONNRESET) { char *utf8_message = gpr_format_message(info->wsa_error); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); @@ -153,7 +153,7 @@ static int on_read(grpc_tcp *tcp, int success) { success = 0; gpr_slice_unref(tcp->read_slice); } else { - if (info->bytes_transfered != 0) { + if (info->bytes_transfered != 0 && !tcp->shutting_down) { sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); gpr_slice_buffer_add(tcp->read_slices, sub); success = 1; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ed9eee8726..7957066598 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -94,9 +94,6 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_udp_server { - grpc_udp_server_cb cb; - void *cb_arg; - gpr_mu mu; gpr_cv cv; @@ -130,8 +127,6 @@ grpc_udp_server *grpc_udp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -232,6 +227,11 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + } + get_local_ip = 1; rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip)); @@ -282,7 +282,7 @@ static void on_read(void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + sp->read_cb(sp->fd); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); @@ -301,7 +301,6 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -407,15 +406,10 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count, - grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t pollset_count) { size_t i, j; - GPR_ASSERT(new_transport_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); GPR_ASSERT(s->active_ports == 0); - s->cb = new_transport_cb; - s->cb_arg = cb_arg; s->pollsets = pollsets; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 389f84ecca..c930e81cbc 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -39,21 +39,15 @@ /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* New server callback: ep is the newly connected connection */ -typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); - /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd, - grpc_udp_server_cb new_transport_cb, - void *cb_arg); +typedef void (*grpc_udp_server_read_cb)(int fd); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ -void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_udp_server_cb cb, - void *cb_arg); +void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets, + size_t pollset_count); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index f3ecfd0e60..16b3fed08f 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -63,6 +63,7 @@ typedef struct { int sent_initial_metadata; gpr_uint8 security_context_set; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; + char *service_url; } call_data; /* We can have a per-channel credentials. */ @@ -75,6 +76,13 @@ typedef struct { grpc_mdstr *status_key; } channel_data; +static void reset_service_url(call_data *calld) { + if (calld->service_url != NULL) { + gpr_free(calld->service_url); + calld->service_url = NULL; + } +} + static void bubble_up_error(grpc_call_element *elem, grpc_status_code status, const char *error_msg) { call_data *calld = elem->call_data; @@ -93,6 +101,7 @@ static void on_credentials_metadata(void *user_data, grpc_transport_stream_op *op = &calld->op; grpc_metadata_batch *mdb; size_t i; + reset_service_url(calld); if (status != GRPC_CREDENTIALS_OK) { bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED, "Credentials failed to get metadata."); @@ -111,8 +120,7 @@ static void on_credentials_metadata(void *user_data, grpc_call_next_op(elem, op); } -static char *build_service_url(const char *url_scheme, call_data *calld) { - char *service_url; +void build_service_url(const char *url_scheme, call_data *calld) { char *service = gpr_strdup(grpc_mdstr_as_c_string(calld->method)); char *last_slash = strrchr(service, '/'); if (last_slash == NULL) { @@ -125,10 +133,10 @@ static char *build_service_url(const char *url_scheme, call_data *calld) { *last_slash = '\0'; } if (url_scheme == NULL) url_scheme = ""; - gpr_asprintf(&service_url, "%s://%s%s", url_scheme, + reset_service_url(calld); + gpr_asprintf(&calld->service_url, "%s://%s%s", url_scheme, grpc_mdstr_as_c_string(calld->host), service); gpr_free(service); - return service_url; } static void send_security_metadata(grpc_call_element *elem, @@ -137,7 +145,6 @@ static void send_security_metadata(grpc_call_element *elem, channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = (grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value; - char *service_url = NULL; grpc_credentials *channel_creds = chand->security_connector->request_metadata_creds; int channel_creds_has_md = @@ -165,13 +172,12 @@ static void send_security_metadata(grpc_call_element *elem, grpc_credentials_ref(call_creds_has_md ? ctx->creds : channel_creds); } - service_url = - build_service_url(chand->security_connector->base.url_scheme, calld); + build_service_url(chand->security_connector->base.url_scheme, calld); calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollset); - grpc_credentials_get_request_metadata( - calld->creds, calld->pollset, service_url, on_credentials_metadata, elem); - gpr_free(service_url); + grpc_credentials_get_request_metadata(calld->creds, calld->pollset, + calld->service_url, + on_credentials_metadata, elem); } static void on_host_checked(void *user_data, grpc_security_status status) { @@ -203,7 +209,8 @@ static void auth_start_transport_op(grpc_call_element *elem, size_t i; grpc_client_security_context *sec_ctx = NULL; - if (calld->security_context_set == 0) { + if (calld->security_context_set == 0 && + op->cancel_with_status == GRPC_STATUS_OK) { calld->security_context_set = 1; GPR_ASSERT(op->context); if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) { @@ -218,11 +225,11 @@ static void auth_start_transport_op(grpc_call_element *elem, chand->security_connector->base.auth_context, "client_auth_filter"); } - if (op->bind_pollset) { + if (op->bind_pollset != NULL) { calld->pollset = op->bind_pollset; } - if (op->send_ops && !calld->sent_initial_metadata) { + if (op->send_ops != NULL && !calld->sent_initial_metadata) { size_t nops = op->send_ops->nops; grpc_stream_op *ops = op->send_ops->ops; for (i = 0; i < nops; i++) { @@ -274,13 +281,7 @@ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data, grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; - calld->creds = NULL; - calld->host = NULL; - calld->method = NULL; - calld->pollset = NULL; - calld->sent_initial_metadata = 0; - calld->security_context_set = 0; - + memset(calld, 0, sizeof(*calld)); GPR_ASSERT(!initial_op || !initial_op->send_ops); } @@ -294,6 +295,7 @@ static void destroy_call_elem(grpc_call_element *elem) { if (calld->method != NULL) { GRPC_MDSTR_UNREF(calld->method); } + reset_service_url(calld); } /* Constructor for channel_data */ diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 87669b9e68..79821eed49 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -1185,3 +1185,95 @@ grpc_credentials *grpc_google_iam_credentials_create( c->iam_md, GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, authority_selector); return &c->base; } + +/* -- Plugin credentials. -- */ + +typedef struct { + void *user_data; + grpc_credentials_metadata_cb cb; +} grpc_metadata_plugin_request; + +static void plugin_destruct(grpc_credentials *creds) { + grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + if (c->plugin.state != NULL && c->plugin.destroy != NULL) { + c->plugin.destroy(c->plugin.state); + } +} + +static int plugin_has_request_metadata(const grpc_credentials *creds) { + return 1; +} + +static int plugin_has_request_metadata_only(const grpc_credentials *creds) { + return 1; +} + +static void plugin_md_request_metadata_ready(void *request, + const grpc_metadata *md, + size_t num_md, + grpc_status_code status, + const char *error_details) { + grpc_metadata_plugin_request *r = (grpc_metadata_plugin_request *)request; + if (status != GRPC_STATUS_OK) { + if (error_details != NULL) { + gpr_log(GPR_ERROR, "Getting metadata from plugin failed with error: %s", + error_details); + } + r->cb(r->user_data, NULL, 0, GRPC_CREDENTIALS_ERROR); + } else { + size_t i; + grpc_credentials_md *md_array = NULL; + if (num_md > 0) { + md_array = gpr_malloc(num_md * sizeof(grpc_credentials_md)); + for (i = 0; i < num_md; i++) { + md_array[i].key = gpr_slice_from_copied_string(md[i].key); + md_array[i].value = + gpr_slice_from_copied_buffer(md[i].value, md[i].value_length); + } + } + r->cb(r->user_data, md_array, num_md, GRPC_CREDENTIALS_OK); + if (md_array != NULL) { + for (i = 0; i < num_md; i++) { + gpr_slice_unref(md_array[i].key); + gpr_slice_unref(md_array[i].value); + } + gpr_free(md_array); + } + } + gpr_free(r); +} + +static void plugin_get_request_metadata(grpc_credentials *creds, + grpc_pollset *pollset, + const char *service_url, + grpc_credentials_metadata_cb cb, + void *user_data) { + grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; + if (c->plugin.get_metadata != NULL) { + grpc_metadata_plugin_request *request = gpr_malloc(sizeof(*request)); + memset(request, 0, sizeof(*request)); + request->user_data = user_data; + request->cb = cb; + c->plugin.get_metadata(c->plugin.state, service_url, + plugin_md_request_metadata_ready, request); + } else { + cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK); + } +} + +static grpc_credentials_vtable plugin_vtable = { + plugin_destruct, plugin_has_request_metadata, + plugin_has_request_metadata_only, plugin_get_request_metadata, NULL}; + +grpc_credentials *grpc_metadata_credentials_create_from_plugin( + grpc_metadata_credentials_plugin plugin, void *reserved) { + grpc_plugin_credentials *c = gpr_malloc(sizeof(*c)); + GPR_ASSERT(reserved == NULL); + memset(c, 0, sizeof(*c)); + c->base.type = GRPC_CREDENTIALS_TYPE_METADATA_PLUGIN; + c->base.vtable = &plugin_vtable; + gpr_ref_init(&c->base.refcount, 1); + c->plugin = plugin; + return &c->base; +} + diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 8e4fed7615..38ce0f8ba6 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -56,6 +56,7 @@ typedef enum { #define GRPC_CREDENTIALS_TYPE_SSL "Ssl" #define GRPC_CREDENTIALS_TYPE_OAUTH2 "Oauth2" +#define GRPC_CREDENTIALS_TYPE_METADATA_PLUGIN "Plugin" #define GRPC_CREDENTIALS_TYPE_JWT "Jwt" #define GRPC_CREDENTIALS_TYPE_IAM "Iam" #define GRPC_CREDENTIALS_TYPE_COMPOSITE "Composite" @@ -322,4 +323,12 @@ typedef struct { grpc_credentials *connector_creds; } grpc_composite_credentials; +/* -- Plugin credentials. -- */ + +typedef struct { + grpc_credentials base; + grpc_metadata_credentials_plugin plugin; + grpc_credentials_md_store *plugin_md; +} grpc_plugin_credentials; + #endif /* GRPC_INTERNAL_CORE_SECURITY_CREDENTIALS_H */ diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/handshake.c index bf0079577e..3b49271373 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/handshake.c @@ -31,7 +31,7 @@ * */ -#include "src/core/security/secure_transport_setup.h" +#include "src/core/security/handshake.h" #include <string.h> @@ -52,133 +52,134 @@ typedef struct { gpr_slice_buffer left_overs; gpr_slice_buffer incoming; gpr_slice_buffer outgoing; - grpc_secure_transport_setup_done_cb cb; + grpc_security_handshake_done_cb cb; void *user_data; grpc_iomgr_closure on_handshake_data_sent_to_peer; grpc_iomgr_closure on_handshake_data_received_from_peer; -} grpc_secure_transport_setup; +} grpc_security_handshake; + static void on_handshake_data_received_from_peer(void *setup, int success); static void on_handshake_data_sent_to_peer(void *setup, int success); -static void secure_transport_setup_done(grpc_secure_transport_setup *s, - int is_success) { +static void security_handshake_done(grpc_security_handshake *h, + int is_success) { if (is_success) { - s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, - s->secure_endpoint); + h->cb(h->user_data, GRPC_SECURITY_OK, h->wrapped_endpoint, + h->secure_endpoint); } else { - if (s->secure_endpoint != NULL) { - grpc_endpoint_shutdown(s->secure_endpoint); - grpc_endpoint_destroy(s->secure_endpoint); + if (h->secure_endpoint != NULL) { + grpc_endpoint_shutdown(h->secure_endpoint); + grpc_endpoint_destroy(h->secure_endpoint); } else { - grpc_endpoint_destroy(s->wrapped_endpoint); + grpc_endpoint_destroy(h->wrapped_endpoint); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL); + h->cb(h->user_data, GRPC_SECURITY_ERROR, h->wrapped_endpoint, NULL); } - if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); - if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); - gpr_slice_buffer_destroy(&s->left_overs); - gpr_slice_buffer_destroy(&s->outgoing); - gpr_slice_buffer_destroy(&s->incoming); - GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); - gpr_free(s); + if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker); + if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer); + gpr_slice_buffer_destroy(&h->left_overs); + gpr_slice_buffer_destroy(&h->outgoing); + gpr_slice_buffer_destroy(&h->incoming); + GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); + gpr_free(h); } static void on_peer_checked(void *user_data, grpc_security_status status) { - grpc_secure_transport_setup *s = user_data; + grpc_security_handshake *h = user_data; tsi_frame_protector *protector; tsi_result result; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Error checking peer."); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } result = - tsi_handshaker_create_frame_protector(s->handshaker, NULL, &protector); + tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Frame protector creation failed with error %s.", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } - s->secure_endpoint = - grpc_secure_endpoint_create(protector, s->wrapped_endpoint, - s->left_overs.slices, s->left_overs.count); - s->left_overs.count = 0; - s->left_overs.length = 0; - secure_transport_setup_done(s, 1); + h->secure_endpoint = + grpc_secure_endpoint_create(protector, h->wrapped_endpoint, + h->left_overs.slices, h->left_overs.count); + h->left_overs.count = 0; + h->left_overs.length = 0; + security_handshake_done(h, 1); return; } -static void check_peer(grpc_secure_transport_setup *s) { +static void check_peer(grpc_security_handshake *h) { grpc_security_status peer_status; tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer); + tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } - peer_status = grpc_security_connector_check_peer(s->connector, peer, - on_peer_checked, s); + peer_status = grpc_security_connector_check_peer(h->connector, peer, + on_peer_checked, h); if (peer_status == GRPC_SECURITY_ERROR) { gpr_log(GPR_ERROR, "Peer check failed."); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } else if (peer_status == GRPC_SECURITY_OK) { - on_peer_checked(s, peer_status); + on_peer_checked(h, peer_status); } } -static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { +static void send_handshake_bytes_to_peer(grpc_security_handshake *h) { size_t offset = 0; tsi_result result = TSI_OK; gpr_slice to_send; do { - size_t to_send_size = s->handshake_buffer_size - offset; + size_t to_send_size = h->handshake_buffer_size - offset; result = tsi_handshaker_get_bytes_to_send_to_peer( - s->handshaker, s->handshake_buffer + offset, &to_send_size); + h->handshaker, h->handshake_buffer + offset, &to_send_size); offset += to_send_size; if (result == TSI_INCOMPLETE_DATA) { - s->handshake_buffer_size *= 2; - s->handshake_buffer = - gpr_realloc(s->handshake_buffer, s->handshake_buffer_size); + h->handshake_buffer_size *= 2; + h->handshake_buffer = + gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); } } while (result == TSI_INCOMPLETE_DATA); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } to_send = - gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); - gpr_slice_buffer_reset_and_unref(&s->outgoing); - gpr_slice_buffer_add(&s->outgoing, to_send); + gpr_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); + gpr_slice_buffer_reset_and_unref(&h->outgoing); + gpr_slice_buffer_add(&h->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, - &s->on_handshake_data_sent_to_peer)) { + switch (grpc_endpoint_write(h->wrapped_endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer)) { case GRPC_ENDPOINT_ERROR: gpr_log(GPR_ERROR, "Could not send handshake data to peer."); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); break; case GRPC_ENDPOINT_DONE: - on_handshake_data_sent_to_peer(s, 1); + on_handshake_data_sent_to_peer(h, 1); break; case GRPC_ENDPOINT_PENDING: break; } } -static void on_handshake_data_received_from_peer(void *setup, int success) { - grpc_secure_transport_setup *s = setup; +static void on_handshake_data_received_from_peer(void *handshake, int success) { + grpc_security_handshake *h = handshake; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; size_t i; @@ -187,35 +188,35 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { if (!success) { gpr_log(GPR_ERROR, "Read failed."); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } - for (i = 0; i < s->incoming.count; i++) { - consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]); + for (i = 0; i < h->incoming.count; i++) { + consumed_slice_size = GPR_SLICE_LENGTH(h->incoming.slices[i]); result = tsi_handshaker_process_bytes_from_peer( - s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]), + h->handshaker, GPR_SLICE_START_PTR(h->incoming.slices[i]), &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(s->handshaker)) break; + if (!tsi_handshaker_is_in_progress(h->handshaker)) break; } - if (tsi_handshaker_is_in_progress(s->handshaker)) { + if (tsi_handshaker_is_in_progress(h->handshaker)) { /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer)) { + switch (grpc_endpoint_read(h->wrapped_endpoint, &h->incoming, + &h->on_handshake_data_received_from_peer)) { case GRPC_ENDPOINT_DONE: - on_handshake_data_received_from_peer(s, 1); + on_handshake_data_received_from_peer(h, 1); break; case GRPC_ENDPOINT_ERROR: - on_handshake_data_received_from_peer(s, 0); + on_handshake_data_received_from_peer(h, 0); break; case GRPC_ENDPOINT_PENDING: break; } return; } else { - send_handshake_bytes_to_peer(s); + send_handshake_bytes_to_peer(h); return; } } @@ -223,90 +224,85 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + security_handshake_done(h, 0); return; } /* Handshake is done and successful this point. */ has_left_overs_in_current_slice = - (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i])); + (consumed_slice_size < GPR_SLICE_LENGTH(h->incoming.slices[i])); num_left_overs = - (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1; + (has_left_overs_in_current_slice ? 1 : 0) + h->incoming.count - i - 1; if (num_left_overs == 0) { - check_peer(s); + check_peer(h); return; } + /* Put the leftovers in our buffer (ownership transfered). */ if (has_left_overs_in_current_slice) { gpr_slice_buffer_add( - &s->left_overs, - gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size)); + &h->left_overs, + gpr_slice_split_tail(&h->incoming.slices[i], consumed_slice_size)); gpr_slice_unref( - s->incoming.slices[i]); /* split_tail above increments refcount. */ + h->incoming.slices[i]); /* split_tail above increments refcount. */ } gpr_slice_buffer_addn( - &s->left_overs, &s->incoming.slices[i + 1], + &h->left_overs, &h->incoming.slices[i + 1], num_left_overs - (size_t)has_left_overs_in_current_slice); - check_peer(s); + check_peer(h); } -/* If setup is NULL, the setup is done. */ -static void on_handshake_data_sent_to_peer(void *setup, int success) { - grpc_secure_transport_setup *s = setup; +/* If handshake is NULL, the handshake is done. */ +static void on_handshake_data_sent_to_peer(void *handshake, int success) { + grpc_security_handshake *h = handshake; /* Make sure that write is OK. */ if (!success) { gpr_log(GPR_ERROR, "Write failed."); - if (setup != NULL) secure_transport_setup_done(s, 0); + if (handshake != NULL) security_handshake_done(h, 0); return; } /* We may be done. */ - if (tsi_handshaker_is_in_progress(s->handshaker)) { + if (tsi_handshaker_is_in_progress(h->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer)) { + switch (grpc_endpoint_read(h->wrapped_endpoint, &h->incoming, + &h->on_handshake_data_received_from_peer)) { case GRPC_ENDPOINT_ERROR: - on_handshake_data_received_from_peer(s, 0); + on_handshake_data_received_from_peer(h, 0); break; case GRPC_ENDPOINT_PENDING: break; case GRPC_ENDPOINT_DONE: - on_handshake_data_received_from_peer(s, 1); + on_handshake_data_received_from_peer(h, 1); break; } } else { - check_peer(s); + check_peer(h); } } -void grpc_setup_secure_transport(grpc_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, - grpc_secure_transport_setup_done_cb cb, - void *user_data) { - grpc_security_status result = GRPC_SECURITY_OK; - grpc_secure_transport_setup *s = - gpr_malloc(sizeof(grpc_secure_transport_setup)); - memset(s, 0, sizeof(grpc_secure_transport_setup)); - result = grpc_security_connector_create_handshaker(connector, &s->handshaker); - if (result != GRPC_SECURITY_OK) { - secure_transport_setup_done(s, 0); - return; - } - s->connector = - GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); - s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; - s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); - s->wrapped_endpoint = nonsecure_endpoint; - s->user_data = user_data; - s->cb = cb; - grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer, - on_handshake_data_sent_to_peer, s); - grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer, - on_handshake_data_received_from_peer, s); - gpr_slice_buffer_init(&s->left_overs); - gpr_slice_buffer_init(&s->outgoing); - gpr_slice_buffer_init(&s->incoming); - send_handshake_bytes_to_peer(s); +void grpc_do_security_handshake(tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { + grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake)); + memset(h, 0, sizeof(grpc_security_handshake)); + h->handshaker = handshaker; + h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); + h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; + h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); + h->wrapped_endpoint = nonsecure_endpoint; + h->user_data = user_data; + h->cb = cb; + grpc_iomgr_closure_init(&h->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, h); + grpc_iomgr_closure_init(&h->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, h); + gpr_slice_buffer_init(&h->left_overs); + gpr_slice_buffer_init(&h->outgoing); + gpr_slice_buffer_init(&h->incoming); + send_handshake_bytes_to_peer(h); } diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/handshake.h index d9b802556d..d7e4a30580 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/handshake.h @@ -31,23 +31,18 @@ * */ -#ifndef GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H -#define GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H +#ifndef GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H +#define GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H #include "src/core/iomgr/endpoint.h" #include "src/core/security/security_connector.h" -/* --- Secure transport setup --- */ -/* Ownership of the secure_endpoint is transfered. */ -typedef void (*grpc_secure_transport_setup_done_cb)( - void *user_data, grpc_security_status status, - grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); +/* Calls the callback upon completion. Takes owership of handshaker. */ +void grpc_do_security_handshake(tsi_handshaker *handshaker, + grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data); -/* Calls the callback upon completion. */ -void grpc_setup_secure_transport(grpc_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, - grpc_secure_transport_setup_done_cb cb, - void *user_data); - -#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */ +#endif /* GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H */ diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index ba9ac68c5f..f6460a323e 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -36,6 +36,7 @@ #include <string.h> #include "src/core/security/credentials.h" +#include "src/core/security/handshake.h" #include "src/core/security/secure_endpoint.h" #include "src/core/security/security_context.h" #include "src/core/support/env.h" @@ -101,10 +102,15 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, return NULL; } -grpc_security_status grpc_security_connector_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - if (sc == NULL || handshaker == NULL) return GRPC_SECURITY_ERROR; - return sc->vtable->create_handshaker(sc, handshaker); +void grpc_security_connector_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { + if (sc == NULL || nonsecure_endpoint == NULL) { + cb(user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL); + } else { + sc->vtable->do_handshake(sc, nonsecure_endpoint, cb, user_data); + } } grpc_security_status grpc_security_connector_check_peer( @@ -225,18 +231,6 @@ static void fake_server_destroy(grpc_security_connector *sc) { gpr_free(sc); } -static grpc_security_status fake_channel_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - *handshaker = tsi_create_fake_handshaker(1); - return GRPC_SECURITY_OK; -} - -static grpc_security_status fake_server_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { - *handshaker = tsi_create_fake_handshaker(0); - return GRPC_SECURITY_OK; -} - static grpc_security_status fake_check_peer(grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, @@ -286,11 +280,27 @@ static grpc_security_status fake_channel_check_call_host( } } +static void fake_channel_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { + grpc_do_security_handshake(tsi_create_fake_handshaker(1), sc, + nonsecure_endpoint, cb, user_data); +} + +static void fake_server_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { + grpc_do_security_handshake(tsi_create_fake_handshaker(0), sc, + nonsecure_endpoint, cb, user_data); +} + static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_create_handshaker, fake_check_peer}; + fake_channel_destroy, fake_channel_do_handshake, fake_check_peer}; static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_create_handshaker, fake_check_peer}; + fake_server_destroy, fake_server_do_handshake, fake_check_peer}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( grpc_credentials *request_metadata_creds, int call_host_check_is_async) { @@ -372,22 +382,41 @@ static grpc_security_status ssl_create_handshaker( return GRPC_SECURITY_OK; } -static grpc_security_status ssl_channel_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void ssl_channel_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; - return ssl_create_handshaker(c->handshaker_factory, 1, - c->overridden_target_name != NULL - ? c->overridden_target_name - : c->target_name, - handshaker); + tsi_handshaker *handshaker; + grpc_security_status status = ssl_create_handshaker( + c->handshaker_factory, 1, + c->overridden_target_name != NULL ? c->overridden_target_name + : c->target_name, + &handshaker); + if (status != GRPC_SECURITY_OK) { + cb(user_data, status, nonsecure_endpoint, NULL); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data); + } } -static grpc_security_status ssl_server_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker) { +static void ssl_server_do_handshake(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; - return ssl_create_handshaker(c->handshaker_factory, 0, NULL, handshaker); + tsi_handshaker *handshaker; + grpc_security_status status = + ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker); + if (status != GRPC_SECURITY_OK) { + cb(user_data, status, nonsecure_endpoint, NULL); + } else { + grpc_do_security_handshake(handshaker, sc, nonsecure_endpoint, cb, + user_data); + } } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { @@ -512,10 +541,10 @@ static grpc_security_status ssl_channel_check_call_host( } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_create_handshaker, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_create_handshaker, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer}; static gpr_slice default_pem_root_certs; diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 2c9aa1c5a4..5fc1db382e 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -63,10 +63,17 @@ typedef struct grpc_security_connector grpc_security_connector; typedef void (*grpc_security_check_cb)(void *user_data, grpc_security_status status); + +/* Ownership of the secure_endpoint is transfered. */ +typedef void (*grpc_security_handshake_done_cb)( + void *user_data, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); + typedef struct { void (*destroy)(grpc_security_connector *sc); - grpc_security_status (*create_handshaker)(grpc_security_connector *sc, - tsi_handshaker **handshaker); + void (*do_handshake)(grpc_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data); grpc_security_status (*check_peer)(grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, void *user_data); @@ -100,9 +107,11 @@ grpc_security_connector *grpc_security_connector_ref( void grpc_security_connector_unref(grpc_security_connector *policy); #endif -/* Handshake creation. */ -grpc_security_status grpc_security_connector_create_handshaker( - grpc_security_connector *sc, tsi_handshaker **handshaker); +/* Handshake. */ +void grpc_security_connector_do_handshake(grpc_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, + void *user_data); /* Check the peer. Implementations can choose to check the peer either synchronously or diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 4749f5f516..f7318b2079 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -44,7 +44,6 @@ #include "src/core/security/credentials.h" #include "src/core/security/security_connector.h" #include "src/core/security/security_context.h" -#include "src/core/security/secure_transport_setup.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> @@ -123,10 +122,9 @@ static int remove_tcp_from_list_locked(grpc_server_secure_state *state, return -1; } -static void on_secure_transport_setup_done(void *statep, - grpc_security_status status, - grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint) { +static void on_secure_handshake_done(void *statep, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, + grpc_endpoint *secure_endpoint) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; @@ -165,8 +163,8 @@ static void on_accept(void *statep, grpc_endpoint *tcp) { node->next = state->handshaking_tcp_endpoints; state->handshaking_tcp_endpoints = node; gpr_mu_unlock(&state->mu); - grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, - state); + grpc_security_connector_do_handshake(state->sc, tcp, on_secure_handshake_done, + state); } /* Server callback: start listening on our ports */ diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c index 021c4666d4..8b050dbee7 100644 --- a/src/core/support/log_posix.c +++ b/src/core/support/log_posix.c @@ -64,7 +64,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, } else { message = allocated = gpr_malloc((size_t)ret + 1); va_start(args, format); - vsnprintf(message, ret + 1, format, args); + vsnprintf(message, (size_t)(ret + 1), format, args); va_end(args); } gpr_log_message(file, line, severity, message); diff --git a/src/core/support/string.c b/src/core/support/string.c index af0389ea83..e0ffeb8a4a 100644 --- a/src/core/support/string.c +++ b/src/core/support/string.c @@ -101,7 +101,7 @@ static void asciidump(dump_out *out, const char *buf, size_t len) { dump_out_append(out, '\''); } for (cur = beg; cur != end; ++cur) { - dump_out_append(out, isprint(*cur) ? *(char *)cur : '.'); + dump_out_append(out, (char)(isprint(*cur) ? *(char *)cur : '.')); } if (!out_was_empty) { dump_out_append(out, '\''); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 88a7c16598..5c55ad3655 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -67,6 +67,7 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; + int removed; grpc_iomgr_closure on_complete; grpc_alarm alarm; grpc_connectivity_state state; @@ -77,10 +78,6 @@ typedef struct { } state_watcher; static void delete_state_watcher(state_watcher *w) { - grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(client_channel_elem, - grpc_cq_pollset(w->cq)); GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); gpr_mu_destroy(&w->mu); gpr_free(w); @@ -112,7 +109,17 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) { static void partly_done(state_watcher *w, int due_to_completion) { int delete = 0; + grpc_channel_element *client_channel_elem = NULL; + gpr_mu_lock(&w->mu); + if (w->removed == 0) { + w->removed = 1; + client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, + grpc_cq_pollset(w->cq)); + } + gpr_mu_unlock(&w->mu); if (due_to_completion) { gpr_mu_lock(&w->mu); w->success = 1; @@ -163,6 +170,7 @@ void grpc_channel_watch_connectivity_state( w->phase = WAITING; w->state = last_observed_state; w->success = 0; + w->removed = 0; w->cq = cq; w->tag = tag; w->channel = channel; diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 9e2cf1cf66..d323d0d74f 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -88,6 +88,8 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_add_callback(notify); } +static void connector_shutdown(grpc_connector *con) {} + static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, @@ -103,7 +105,7 @@ static void connector_connect(grpc_connector *con, } static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_connect}; + connector_ref, connector_unref, connector_shutdown, connector_connect}; typedef struct { grpc_subchannel_factory base; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 0d48cd42d7..93c27c77bf 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -40,6 +40,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" +#include "src/core/client_config/lb_policy_registry.h" +#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policies/round_robin.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" @@ -85,6 +88,9 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); + grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); + grpc_register_lb_policy(grpc_round_robin_lb_factory_create()); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 6eb31ad63f..5a4cf4ad0e 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -47,7 +47,6 @@ #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth_filters.h" #include "src/core/security/credentials.h" -#include "src/core/security/secure_transport_setup.h" #include "src/core/surface/channel.h" #include "src/core/transport/chttp2_transport.h" #include "src/core/tsi/transport_security_interface.h" @@ -61,6 +60,9 @@ typedef struct { grpc_iomgr_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; + + gpr_mu mu; + grpc_endpoint *connecting_endpoint; } connector; static void connector_ref(grpc_connector *con) { @@ -75,16 +77,25 @@ static void connector_unref(grpc_connector *con) { } } -static void on_secure_transport_setup_done(void *arg, - grpc_security_status status, - grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint) { +static void on_secure_handshake_done(void *arg, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, + grpc_endpoint *secure_endpoint) { connector *c = arg; grpc_iomgr_closure *notify; - if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); + gpr_mu_lock(&c->mu); + if (c->connecting_endpoint == NULL) { + memset(c->result, 0, sizeof(*c->result)); + gpr_mu_unlock(&c->mu); + } else if (status != GRPC_SECURITY_OK) { + GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint); + gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); memset(c->result, 0, sizeof(*c->result)); + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); } else { + GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint); + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); @@ -102,8 +113,12 @@ static void connected(void *arg, grpc_endpoint *tcp) { connector *c = arg; grpc_iomgr_closure *notify; if (tcp != NULL) { - grpc_setup_secure_transport(&c->security_connector->base, tcp, - on_secure_transport_setup_done, c); + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->connecting_endpoint == NULL); + c->connecting_endpoint = tcp; + gpr_mu_unlock(&c->mu); + grpc_security_connector_do_handshake(&c->security_connector->base, tcp, + on_secure_handshake_done, c); } else { memset(c->result, 0, sizeof(*c->result)); notify = c->notify; @@ -112,6 +127,18 @@ static void connected(void *arg, grpc_endpoint *tcp) { } } +static void connector_shutdown(grpc_connector *con) { + connector *c = (connector *)con; + grpc_endpoint *ep; + gpr_mu_lock(&c->mu); + ep = c->connecting_endpoint; + c->connecting_endpoint = NULL; + gpr_mu_unlock(&c->mu); + if (ep) { + grpc_endpoint_shutdown(ep); + } +} + static void connector_connect(grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, @@ -122,12 +149,15 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->connecting_endpoint == NULL); + gpr_mu_unlock(&c->mu); grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline); } static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_connect}; + connector_ref, connector_unref, connector_shutdown, connector_connect}; typedef struct { grpc_subchannel_factory base; @@ -165,6 +195,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; + gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index a592ce7d28..f26f446787 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -486,7 +486,7 @@ static int init_skip_frame_parser( transport_parsing->hpack_parser.on_header_user_data = NULL; transport_parsing->hpack_parser.is_boundary = is_eoh; transport_parsing->hpack_parser.is_eof = - is_eoh ? transport_parsing->header_eof : 0; + (gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0); } else { transport_parsing->parser = skip_parser; } @@ -696,7 +696,7 @@ static int init_header_frame_parser( transport_parsing->hpack_parser.on_header_user_data = transport_parsing; transport_parsing->hpack_parser.is_boundary = is_eoh; transport_parsing->hpack_parser.is_eof = - is_eoh ? transport_parsing->header_eof : 0; + (gpr_uint8)(is_eoh ? transport_parsing->header_eof : 0); if (!is_continuation && (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) { grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser); diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 2260f6d33e..1693cf740b 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -144,4 +144,64 @@ std::shared_ptr<Credentials> CompositeCredentials( return nullptr; } +void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { + if (wrapper == nullptr) return; + MetadataCredentialsPluginWrapper* w = + reinterpret_cast<MetadataCredentialsPluginWrapper*>(wrapper); + delete w; +} + +void MetadataCredentialsPluginWrapper::GetMetadata( + void* wrapper, const char* service_url, + grpc_credentials_plugin_metadata_cb cb, void* user_data) { + GPR_ASSERT(wrapper != nullptr); + MetadataCredentialsPluginWrapper* w = + reinterpret_cast<MetadataCredentialsPluginWrapper*>(wrapper); + if (w->plugin_ == nullptr) { + cb(user_data, NULL, 0, GRPC_STATUS_OK, NULL); + return; + } + if (w->plugin_->IsBlocking()) { + w->thread_pool_->Add( + std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, + service_url, cb, user_data)); + } else { + w->InvokePlugin(service_url, cb, user_data); + } +} + +void MetadataCredentialsPluginWrapper::InvokePlugin( + const char* service_url, grpc_credentials_plugin_metadata_cb cb, + void* user_data) { + std::multimap<grpc::string, grpc::string> metadata; + Status status = plugin_->GetMetadata(service_url, &metadata); + std::vector<grpc_metadata> md; + for (auto it = metadata.begin(); it != metadata.end(); ++it) { + md.push_back({it->first.c_str(), + it->second.data(), + it->second.size(), + 0, + {{nullptr, nullptr, nullptr, nullptr}}}); + } + cb(user_data, md.empty() ? nullptr : &md[0], md.size(), + static_cast<grpc_status_code>(status.error_code()), + status.error_message().c_str()); +} + +MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper( + std::unique_ptr<MetadataCredentialsPlugin> plugin) + : thread_pool_(CreateDefaultThreadPool()), plugin_(std::move(plugin)) {} + +std::shared_ptr<Credentials> MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin> plugin) { + GrpcLibrary init; // To call grpc_init(). + MetadataCredentialsPluginWrapper* wrapper = + new MetadataCredentialsPluginWrapper(std::move(plugin)); + grpc_metadata_credentials_plugin c_plugin = { + MetadataCredentialsPluginWrapper::GetMetadata, + MetadataCredentialsPluginWrapper::Destroy, wrapper}; + return WrapCredentials( + grpc_metadata_credentials_create_from_plugin(c_plugin, nullptr)); +} + } // namespace grpc diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h index 8deff856c4..d354827725 100644 --- a/src/cpp/client/secure_credentials.h +++ b/src/cpp/client/secure_credentials.h @@ -39,6 +39,8 @@ #include <grpc++/support/config.h> #include <grpc++/security/credentials.h> +#include "src/cpp/server/thread_pool_interface.h" + namespace grpc { class SecureCredentials GRPC_FINAL : public Credentials { @@ -56,6 +58,23 @@ class SecureCredentials GRPC_FINAL : public Credentials { grpc_credentials* const c_creds_; }; +class MetadataCredentialsPluginWrapper GRPC_FINAL { + public: + static void Destroy(void* wrapper); + static void GetMetadata(void* wrapper, const char* service_url, + grpc_credentials_plugin_metadata_cb cb, + void* user_data); + + explicit MetadataCredentialsPluginWrapper( + std::unique_ptr<MetadataCredentialsPlugin> plugin); + + private: + void InvokePlugin(const char* service_url, + grpc_credentials_plugin_metadata_cb cb, void* user_data); + std::unique_ptr<ThreadPoolInterface> thread_pool_; + std::unique_ptr<MetadataCredentialsPlugin> plugin_; +}; + } // namespace grpc #endif // GRPC_INTERNAL_CPP_CLIENT_SECURE_CREDENTIALS_H diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index 35561d25d8..5c5b802164 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -79,48 +79,72 @@ namespace Grpc.Core.Logging } /// <summary>Logs a message with severity Debug.</summary> - public void Debug(string message, params object[] formatArgs) + public void Debug(string message) { - Log("D", message, formatArgs); + Log("D", message); + } + + /// <summary>Logs a formatted message with severity Debug.</summary> + public void Debug(string format, params object[] formatArgs) + { + Debug(string.Format(format, formatArgs)); } /// <summary>Logs a message with severity Info.</summary> - public void Info(string message, params object[] formatArgs) + public void Info(string message) + { + Log("I", message); + } + + /// <summary>Logs a formatted message with severity Info.</summary> + public void Info(string format, params object[] formatArgs) { - Log("I", message, formatArgs); + Info(string.Format(format, formatArgs)); } /// <summary>Logs a message with severity Warning.</summary> - public void Warning(string message, params object[] formatArgs) + public void Warning(string message) { - Log("W", message, formatArgs); + Log("W", message); + } + + /// <summary>Logs a formatted message with severity Warning.</summary> + public void Warning(string format, params object[] formatArgs) + { + Warning(string.Format(format, formatArgs)); } /// <summary>Logs a message and an associated exception with severity Warning.</summary> - public void Warning(Exception exception, string message, params object[] formatArgs) + public void Warning(Exception exception, string message) { - Log("W", message + " " + exception, formatArgs); + Warning(message + " " + exception); } /// <summary>Logs a message with severity Error.</summary> - public void Error(string message, params object[] formatArgs) + public void Error(string message) + { + Log("E", message); + } + + /// <summary>Logs a formatted message with severity Error.</summary> + public void Error(string format, params object[] formatArgs) { - Log("E", message, formatArgs); + Error(string.Format(format, formatArgs)); } /// <summary>Logs a message and an associated exception with severity Error.</summary> - public void Error(Exception exception, string message, params object[] formatArgs) + public void Error(Exception exception, string message) { - Log("E", message + " " + exception, formatArgs); + Error(message + " " + exception); } - private void Log(string severityString, string message, object[] formatArgs) + private void Log(string severityString, string message) { Console.Error.WriteLine("{0}{1} {2}{3}", severityString, DateTime.Now, forTypeString, - string.Format(message, formatArgs)); + message); } } } diff --git a/src/csharp/Grpc.Core/Logging/ILogger.cs b/src/csharp/Grpc.Core/Logging/ILogger.cs index 61e0c91388..7c0326422f 100644 --- a/src/csharp/Grpc.Core/Logging/ILogger.cs +++ b/src/csharp/Grpc.Core/Logging/ILogger.cs @@ -43,21 +43,33 @@ namespace Grpc.Core.Logging ILogger ForType<T>(); /// <summary>Logs a message with severity Debug.</summary> - void Debug(string message, params object[] formatArgs); + void Debug(string message); + + /// <summary>Logs a formatted message with severity Debug.</summary> + void Debug(string format, params object[] formatArgs); /// <summary>Logs a message with severity Info.</summary> - void Info(string message, params object[] formatArgs); + void Info(string message); + + /// <summary>Logs a formatted message with severity Info.</summary> + void Info(string format, params object[] formatArgs); /// <summary>Logs a message with severity Warning.</summary> - void Warning(string message, params object[] formatArgs); + void Warning(string message); + + /// <summary>Logs a formatted message with severity Warning.</summary> + void Warning(string format, params object[] formatArgs); /// <summary>Logs a message and an associated exception with severity Warning.</summary> - void Warning(Exception exception, string message, params object[] formatArgs); + void Warning(Exception exception, string message); /// <summary>Logs a message with severity Error.</summary> - void Error(string message, params object[] formatArgs); + void Error(string message); + + /// <summary>Logs a formatted message with severity Error.</summary> + void Error(string format, params object[] formatArgs); /// <summary>Logs a message and an associated exception with severity Error.</summary> - void Error(Exception exception, string message, params object[] formatArgs); + void Error(Exception exception, string message); } } diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index eda821bc31..e9ec30e923 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -41,6 +41,6 @@ namespace Grpc.Core /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "0.7.0"; + public const string CurrentVersion = "0.7.1"; } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 0884c6ea60..616093d4ae 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -131,7 +131,7 @@ namespace Grpc.IntegrationTesting var channel = new Channel(options.ServerHost, options.ServerPort, credentials, channelOptions); TestService.TestServiceClient client = new TestService.TestServiceClient(channel); await RunTestCaseAsync(client, options); - channel.ShutdownAsync().Wait(); + await channel.ShutdownAsync(); } private async Task RunTestCaseAsync(TestService.TestServiceClient client, ClientOptions options) diff --git a/src/csharp/Grpc.IntegrationTesting/proto/test.proto b/src/csharp/Grpc.IntegrationTesting/proto/test.proto index f9e0d2a039..5496f72af0 100644 --- a/src/csharp/Grpc.IntegrationTesting/proto/test.proto +++ b/src/csharp/Grpc.IntegrationTesting/proto/test.proto @@ -44,7 +44,6 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index ea2206e760..a3505b1e01 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -1,12 +1,12 @@ @rem Builds gRPC NuGet packages @rem Current package versions -set VERSION=0.7.0 -set CORE_VERSION=0.11.0 +set VERSION=0.7.1 +set CORE_VERSION=0.11.1 set PROTOBUF_VERSION=3.0.0-alpha4 @rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well. -set VERSION_WITH_BETA=0.7.0-beta +set VERSION_WITH_BETA=%VERSION%-beta @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 70c0fbcc50..51e0728fb9 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -252,7 +252,7 @@ GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( if (!ctx->recv_message) { return -1; } - return grpc_byte_buffer_length(ctx->recv_message); + return (gpr_intptr)grpc_byte_buffer_length(ctx->recv_message); } /* diff --git a/src/node/binding.gyp b/src/node/binding.gyp index 734dc8410b..a644030932 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -8,7 +8,6 @@ '-std=c++0x', '-Wall', '-pthread', - '-pedantic', '-g', '-zdefs', '-Werror', diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc index 7eff11c2b3..e1786ddba7 100644 --- a/src/node/ext/byte_buffer.cc +++ b/src/node/ext/byte_buffer.cc @@ -44,15 +44,16 @@ namespace grpc { namespace node { + using v8::Context; using v8::Function; -using v8::Handle; +using v8::Local; using v8::Object; using v8::Number; using v8::Value; -grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) { - NanScope(); +grpc_byte_buffer *BufferToByteBuffer(Local<Value> buffer) { + Nan::HandleScope scope; int length = ::node::Buffer::Length(buffer); char *data = ::node::Buffer::Data(buffer); gpr_slice slice = gpr_slice_malloc(length); @@ -62,10 +63,10 @@ grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) { return byte_buffer; } -Handle<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) { - NanEscapableScope(); +Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) { + Nan::EscapableHandleScope scope; if (buffer == NULL) { - return NanEscapeScope(NanNull()); + return scope.Escape(Nan::Null()); } size_t length = grpc_byte_buffer_length(buffer); char *result = reinterpret_cast<char *>(calloc(length, sizeof(char))); @@ -77,21 +78,22 @@ Handle<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) { memcpy(result + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); offset += GPR_SLICE_LENGTH(next); } - return NanEscapeScope(MakeFastBuffer(NanNewBufferHandle(result, length))); + return scope.Escape(MakeFastBuffer( + Nan::NewBuffer(result, length).ToLocalChecked())); } -Handle<Value> MakeFastBuffer(Handle<Value> slowBuffer) { - NanEscapableScope(); - Handle<Object> globalObj = NanGetCurrentContext()->Global(); - Handle<Function> bufferConstructor = Handle<Function>::Cast( - globalObj->Get(NanNew("Buffer"))); - Handle<Value> consArgs[3] = { +Local<Value> MakeFastBuffer(Local<Value> slowBuffer) { + Nan::EscapableHandleScope scope; + Local<Object> globalObj = Nan::GetCurrentContext()->Global(); + Local<Function> bufferConstructor = Local<Function>::Cast( + globalObj->Get(Nan::New("Buffer").ToLocalChecked())); + Local<Value> consArgs[3] = { slowBuffer, - NanNew<Number>(::node::Buffer::Length(slowBuffer)), - NanNew<Number>(0) + Nan::New<Number>(::node::Buffer::Length(slowBuffer)), + Nan::New<Number>(0) }; - Handle<Object> fastBuffer = bufferConstructor->NewInstance(3, consArgs); - return NanEscapeScope(fastBuffer); + Local<Object> fastBuffer = bufferConstructor->NewInstance(3, consArgs); + return scope.Escape(fastBuffer); } } // namespace node } // namespace grpc diff --git a/src/node/ext/byte_buffer.h b/src/node/ext/byte_buffer.h index 5083674d39..55bc0ab377 100644 --- a/src/node/ext/byte_buffer.h +++ b/src/node/ext/byte_buffer.h @@ -45,14 +45,14 @@ namespace node { /* Convert a Node.js Buffer to grpc_byte_buffer. Requires that ::node::Buffer::HasInstance(buffer) */ -grpc_byte_buffer *BufferToByteBuffer(v8::Handle<v8::Value> buffer); +grpc_byte_buffer *BufferToByteBuffer(v8::Local<v8::Value> buffer); /* Convert a grpc_byte_buffer to a Node.js Buffer */ -v8::Handle<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer); +v8::Local<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer); /* Convert a ::node::Buffer to a fast Buffer, as defined in the Node Buffer documentation */ -v8::Handle<v8::Value> MakeFastBuffer(v8::Handle<v8::Value> slowBuffer); +v8::Local<v8::Value> MakeFastBuffer(v8::Local<v8::Value> slowBuffer); } // namespace node } // namespace grpc diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 560869e6fa..b08a9f96d8 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -54,52 +54,61 @@ using std::vector; namespace grpc { namespace node { +using Nan::Callback; +using Nan::EscapableHandleScope; +using Nan::HandleScope; +using Nan::Maybe; +using Nan::MaybeLocal; +using Nan::ObjectWrap; +using Nan::Persistent; +using Nan::Utf8String; + using v8::Array; using v8::Boolean; using v8::Exception; using v8::External; using v8::Function; using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Number; using v8::Object; using v8::ObjectTemplate; -using v8::Persistent; using v8::Uint32; using v8::String; using v8::Value; -NanCallback *Call::constructor; +Callback *Call::constructor; Persistent<FunctionTemplate> Call::fun_tpl; bool EndsWith(const char *str, const char *substr) { return strcmp(str+strlen(str)-strlen(substr), substr) == 0; } -bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, +bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array, shared_ptr<Resources> resources) { - NanScope(); + HandleScope scope; grpc_metadata_array_init(array); - Handle<Array> keys(metadata->GetOwnPropertyNames()); + Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); for (unsigned int i = 0; i < keys->Length(); i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - if (!metadata->Get(current_key)->IsArray()) { + Local<String> current_key = Nan::To<String>( + Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked(); + Local<Value> value_array = Nan::Get(metadata, current_key).ToLocalChecked(); + if (!value_array->IsArray()) { return false; } - array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length(); + array->capacity += Local<Array>::Cast(value_array)->Length(); } array->metadata = reinterpret_cast<grpc_metadata*>( gpr_malloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - NanUtf8String *utf8_key = new NanUtf8String(current_key); - resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_key)); - Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); + Local<String> current_key(keys->Get(i)->ToString()); + Utf8String *utf8_key = new Utf8String(current_key); + resources->strings.push_back(unique_ptr<Utf8String>(utf8_key)); + Local<Array> values = Local<Array>::Cast( + Nan::Get(metadata, current_key).ToLocalChecked()); for (unsigned int j = 0; j < values->Length(); j++) { - Handle<Value> value = values->Get(j); + Local<Value> value = Nan::Get(values, j).ToLocalChecked(); grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; // Only allow binary headers for "-bin" keys @@ -107,18 +116,16 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, if (::node::Buffer::HasInstance(value)) { current->value = ::node::Buffer::Data(value); current->value_length = ::node::Buffer::Length(value); - Persistent<Value> *handle = new Persistent<Value>(); - NanAssignPersistent(*handle, value); - resources->handles.push_back(unique_ptr<PersistentHolder>( - new PersistentHolder(handle))); + PersistentValue *handle = new PersistentValue(value); + resources->handles.push_back(unique_ptr<PersistentValue>(handle)); } else { return false; } } else { if (value->IsString()) { - Handle<String> string_value = value->ToString(); - NanUtf8String *utf8_value = new NanUtf8String(string_value); - resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); + Local<String> string_value = Nan::To<String>(value).ToLocalChecked(); + Utf8String *utf8_value = new Utf8String(string_value); + resources->strings.push_back(unique_ptr<Utf8String>(utf8_value)); current->value = **utf8_value; current->value_length = string_value->Length(); } else { @@ -131,8 +138,8 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, return true; } -Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { - NanEscapableScope(); +Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { + EscapableHandleScope scope; grpc_metadata *metadata_elements = metadata_array->metadata; size_t length = metadata_array->count; std::map<const char*, size_t> size_map; @@ -142,49 +149,62 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { const char *key = metadata_elements[i].key; if (size_map.count(key)) { size_map[key] += 1; + } else { + size_map[key] = 1; } index_map[key] = 0; } - Handle<Object> metadata_object = NanNew<Object>(); + Local<Object> metadata_object = Nan::New<Object>(); for (unsigned int i = 0; i < length; i++) { grpc_metadata* elem = &metadata_elements[i]; - Handle<String> key_string = NanNew(elem->key); - Handle<Array> array; - if (metadata_object->Has(key_string)) { - array = Handle<Array>::Cast(metadata_object->Get(key_string)); + Local<String> key_string = Nan::New(elem->key).ToLocalChecked(); + Local<Array> array; + MaybeLocal<Value> maybe_array = Nan::Get(metadata_object, key_string); + if (maybe_array.IsEmpty() || !maybe_array.ToLocalChecked()->IsArray()) { + array = Nan::New<Array>(size_map[elem->key]); + Nan::Set(metadata_object, key_string, array); } else { - array = NanNew<Array>(size_map[elem->key]); - metadata_object->Set(key_string, array); + array = Local<Array>::Cast(maybe_array.ToLocalChecked()); } if (EndsWith(elem->key, "-bin")) { - array->Set(index_map[elem->key], - NanNewBufferHandle(elem->value, elem->value_length)); + Nan::Set(array, index_map[elem->key], + Nan::CopyBuffer(elem->value, + elem->value_length).ToLocalChecked()); } else { - array->Set(index_map[elem->key], NanNew(elem->value)); + Nan::Set(array, index_map[elem->key], + Nan::New(elem->value).ToLocalChecked()); } index_map[elem->key] += 1; } - return NanEscapeScope(metadata_object); + return scope.Escape(metadata_object); } -Handle<Value> Op::GetOpType() const { - NanEscapableScope(); - return NanEscapeScope(NanNew<String>(GetTypeString())); +Local<Value> Op::GetOpType() const { + EscapableHandleScope scope; + return scope.Escape(Nan::New(GetTypeString()).ToLocalChecked()); +} + +Op::~Op() { } class SendMetadataOp : public Op { public: - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(NanTrue()); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::True()); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { if (!value->IsObject()) { return false; } grpc_metadata_array array; - if (!CreateMetadataArray(value->ToObject(), &array, resources)) { + MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value); + if (maybe_metadata.IsEmpty()) { + return false; + } + if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), + &array, resources)) { return false; } out->data.send_initial_metadata.count = array.count; @@ -199,27 +219,28 @@ class SendMetadataOp : public Op { class SendMessageOp : public Op { public: - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(NanTrue()); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::True()); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { if (!::node::Buffer::HasInstance(value)) { return false; } - Handle<Object> object_value = value->ToObject(); - if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) { - Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags")); + Local<Object> object_value = Nan::To<Object>(value).ToLocalChecked(); + MaybeLocal<Value> maybe_flag_value = Nan::Get( + object_value, Nan::New("grpcWriteFlags").ToLocalChecked()); + if (!maybe_flag_value.IsEmpty()) { + Local<Value> flag_value = maybe_flag_value.ToLocalChecked(); if (flag_value->IsUint32()) { - out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK; + Maybe<uint32_t> maybe_flag = Nan::To<uint32_t>(flag_value); + out->flags = maybe_flag.FromMaybe(0) & GRPC_WRITE_USED_MASK; } } out->data.send_message = BufferToByteBuffer(value); - Persistent<Value> *handle = new Persistent<Value>(); - NanAssignPersistent(*handle, value); - resources->handles.push_back(unique_ptr<PersistentHolder>( - new PersistentHolder(handle))); + PersistentValue *handle = new PersistentValue(value); + resources->handles.push_back(unique_ptr<PersistentValue>(handle)); return true; } protected: @@ -230,11 +251,11 @@ class SendMessageOp : public Op { class SendClientCloseOp : public Op { public: - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(NanTrue()); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::True()); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { return true; } @@ -246,39 +267,55 @@ class SendClientCloseOp : public Op { class SendServerStatusOp : public Op { public: - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(NanTrue()); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::True()); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { if (!value->IsObject()) { return false; } - Handle<Object> server_status = value->ToObject(); - if (!server_status->Get(NanNew("metadata"))->IsObject()) { + Local<Object> server_status = Nan::To<Object>(value).ToLocalChecked(); + MaybeLocal<Value> maybe_metadata = Nan::Get( + server_status, Nan::New("metadata").ToLocalChecked()); + if (maybe_metadata.IsEmpty()) { return false; } - if (!server_status->Get(NanNew("code"))->IsUint32()) { + if (!maybe_metadata.ToLocalChecked()->IsObject()) { return false; } - if (!server_status->Get(NanNew("details"))->IsString()) { + Local<Object> metadata = Nan::To<Object>( + maybe_metadata.ToLocalChecked()).ToLocalChecked(); + MaybeLocal<Value> maybe_code = Nan::Get(server_status, + Nan::New("code").ToLocalChecked()); + if (maybe_code.IsEmpty()) { return false; } + if (!maybe_code.ToLocalChecked()->IsUint32()) { + return false; + } + uint32_t code = Nan::To<uint32_t>(maybe_code.ToLocalChecked()).FromJust(); + MaybeLocal<Value> maybe_details = Nan::Get( + server_status, Nan::New("details").ToLocalChecked()); + if (maybe_details.IsEmpty()) { + return false; + } + if (!maybe_details.ToLocalChecked()->IsString()) { + return false; + } + Local<String> details = Nan::To<String>( + maybe_details.ToLocalChecked()).ToLocalChecked(); grpc_metadata_array array; - if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> - ToObject(), - &array, resources)) { + if (!CreateMetadataArray(metadata, &array, resources)) { return false; } out->data.send_status_from_server.trailing_metadata_count = array.count; out->data.send_status_from_server.trailing_metadata = array.metadata; out->data.send_status_from_server.status = - static_cast<grpc_status_code>( - server_status->Get(NanNew("code"))->Uint32Value()); - NanUtf8String *str = new NanUtf8String( - server_status->Get(NanNew("details"))); - resources->strings.push_back(unique_ptr<NanUtf8String>(str)); + static_cast<grpc_status_code>(code); + Utf8String *str = new Utf8String(details); + resources->strings.push_back(unique_ptr<Utf8String>(str)); out->data.send_status_from_server.status_details = **str; return true; } @@ -298,12 +335,12 @@ class GetMetadataOp : public Op { grpc_metadata_array_destroy(&recv_metadata); } - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(ParseMetadata(&recv_metadata)); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(ParseMetadata(&recv_metadata)); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { out->data.recv_initial_metadata = &recv_metadata; return true; @@ -325,15 +362,15 @@ class ReadMessageOp : public Op { } ~ReadMessageOp() { if (recv_message != NULL) { - gpr_free(recv_message); + grpc_byte_buffer_destroy(recv_message); } } - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(ByteBufferToBuffer(recv_message)); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(ByteBufferToBuffer(recv_message)); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { out->data.recv_message = &recv_message; return true; @@ -361,7 +398,7 @@ class ClientStatusOp : public Op { gpr_free(status_details); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; out->data.recv_status_on_client.status = &status; @@ -370,15 +407,18 @@ class ClientStatusOp : public Op { return true; } - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - Handle<Object> status_obj = NanNew<Object>(); - status_obj->Set(NanNew("code"), NanNew<Number>(status)); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + Local<Object> status_obj = Nan::New<Object>(); + Nan::Set(status_obj, Nan::New("code").ToLocalChecked(), + Nan::New<Number>(status)); if (status_details != NULL) { - status_obj->Set(NanNew("details"), NanNew(status_details)); + Nan::Set(status_obj, Nan::New("details").ToLocalChecked(), + Nan::New(status_details).ToLocalChecked()); } - status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); - return NanEscapeScope(status_obj); + Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(), + ParseMetadata(&metadata_array)); + return scope.Escape(status_obj); } protected: std::string GetTypeString() const { @@ -393,12 +433,12 @@ class ClientStatusOp : public Op { class ServerCloseResponseOp : public Op { public: - Handle<Value> GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(NanNew<Boolean>(cancelled)); + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::New<Boolean>(cancelled)); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { out->data.recv_close_on_server.cancelled = &cancelled; return true; @@ -413,7 +453,7 @@ class ServerCloseResponseOp : public Op { int cancelled; }; -tag::tag(NanCallback *callback, OpVec *ops, +tag::tag(Callback *callback, OpVec *ops, shared_ptr<Resources> resources) : callback(callback), ops(ops), resources(resources){ } @@ -423,19 +463,19 @@ tag::~tag() { delete ops; } -Handle<Value> GetTagNodeValue(void *tag) { - NanEscapableScope(); +Local<Value> GetTagNodeValue(void *tag) { + EscapableHandleScope scope; struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - Handle<Object> tag_obj = NanNew<Object>(); + Local<Object> tag_obj = Nan::New<Object>(); for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); it != tag_struct->ops->end(); ++it) { Op *op_ptr = it->get(); - tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue()); + Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); } - return NanEscapeScope(tag_obj); + return scope.Escape(tag_obj); } -NanCallback *GetTagCallback(void *tag) { +Callback *GetTagCallback(void *tag) { struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); return tag_struct->callback; } @@ -452,140 +492,149 @@ Call::~Call() { grpc_call_destroy(wrapped_call); } -void Call::Init(Handle<Object> exports) { - NanScope(); - Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New); - tpl->SetClassName(NanNew("Call")); +void Call::Init(Local<Object> exports) { + HandleScope scope; + Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); + tpl->SetClassName(Nan::New("Call").ToLocalChecked()); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "startBatch", - NanNew<FunctionTemplate>(StartBatch)->GetFunction()); - NanSetPrototypeTemplate(tpl, "cancel", - NanNew<FunctionTemplate>(Cancel)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "cancelWithStatus", - NanNew<FunctionTemplate>(CancelWithStatus)->GetFunction()); - NanSetPrototypeTemplate(tpl, "getPeer", - NanNew<FunctionTemplate>(GetPeer)->GetFunction()); - NanAssignPersistent(fun_tpl, tpl); - Handle<Function> ctr = tpl->GetFunction(); - exports->Set(NanNew("Call"), ctr); - constructor = new NanCallback(ctr); + Nan::SetPrototypeMethod(tpl, "startBatch", StartBatch); + Nan::SetPrototypeMethod(tpl, "cancel", Cancel); + Nan::SetPrototypeMethod(tpl, "cancelWithStatus", CancelWithStatus); + Nan::SetPrototypeMethod(tpl, "getPeer", GetPeer); + fun_tpl.Reset(tpl); + Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); + Nan::Set(exports, Nan::New("Call").ToLocalChecked(), ctr); + constructor = new Callback(ctr); } -bool Call::HasInstance(Handle<Value> val) { - NanScope(); - return NanHasInstance(fun_tpl, val); +bool Call::HasInstance(Local<Value> val) { + HandleScope scope; + return Nan::New(fun_tpl)->HasInstance(val); } -Handle<Value> Call::WrapStruct(grpc_call *call) { - NanEscapableScope(); +Local<Value> Call::WrapStruct(grpc_call *call) { + EscapableHandleScope scope; if (call == NULL) { - return NanEscapeScope(NanNull()); + return scope.Escape(Nan::Null()); } const int argc = 1; - Handle<Value> argv[argc] = {NanNew<External>(reinterpret_cast<void *>(call))}; - return NanEscapeScope(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {Nan::New<External>( + reinterpret_cast<void *>(call))}; + MaybeLocal<Object> maybe_instance = Nan::NewInstance( + constructor->GetFunction(), argc, argv); + if (maybe_instance.IsEmpty()) { + return scope.Escape(Nan::Null()); + } else { + return scope.Escape(maybe_instance.ToLocalChecked()); + } } NAN_METHOD(Call::New) { - NanScope(); - - if (args.IsConstructCall()) { + if (info.IsConstructCall()) { Call *call; - if (args[0]->IsExternal()) { - Handle<External> ext = args[0].As<External>(); + if (info[0]->IsExternal()) { + Local<External> ext = info[0].As<External>(); // This option is used for wrapping an existing call grpc_call *call_value = reinterpret_cast<grpc_call *>(ext->Value()); call = new Call(call_value); } else { - if (!Channel::HasInstance(args[0])) { - return NanThrowTypeError("Call's first argument must be a Channel"); + if (!Channel::HasInstance(info[0])) { + return Nan::ThrowTypeError("Call's first argument must be a Channel"); } - if (!args[1]->IsString()) { - return NanThrowTypeError("Call's second argument must be a string"); + if (!info[1]->IsString()) { + return Nan::ThrowTypeError("Call's second argument must be a string"); } - if (!(args[2]->IsNumber() || args[2]->IsDate())) { - return NanThrowTypeError( + if (!(info[2]->IsNumber() || info[2]->IsDate())) { + return Nan::ThrowTypeError( "Call's third argument must be a date or a number"); } // These arguments are at the end because they are optional grpc_call *parent_call = NULL; - if (Call::HasInstance(args[4])) { - Call *parent_obj = ObjectWrap::Unwrap<Call>(args[4]->ToObject()); + if (Call::HasInstance(info[4])) { + Call *parent_obj = ObjectWrap::Unwrap<Call>( + Nan::To<Object>(info[4]).ToLocalChecked()); parent_call = parent_obj->wrapped_call; - } else if (!(args[4]->IsUndefined() || args[4]->IsNull())) { - return NanThrowTypeError( + } else if (!(info[4]->IsUndefined() || info[4]->IsNull())) { + return Nan::ThrowTypeError( "Call's fifth argument must be another call, if provided"); } gpr_uint32 propagate_flags = GRPC_PROPAGATE_DEFAULTS; - if (args[5]->IsUint32()) { - propagate_flags = args[5]->Uint32Value(); - } else if (!(args[5]->IsUndefined() || args[5]->IsNull())) { - return NanThrowTypeError( + if (info[5]->IsUint32()) { + propagate_flags = Nan::To<uint32_t>(info[5]).FromJust(); + } else if (!(info[5]->IsUndefined() || info[5]->IsNull())) { + return Nan::ThrowTypeError( "Call's sixth argument must be propagate flags, if provided"); } - Handle<Object> channel_object = args[0]->ToObject(); + Local<Object> channel_object = Nan::To<Object>(info[0]).ToLocalChecked(); Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object); if (channel->GetWrappedChannel() == NULL) { - return NanThrowError("Call cannot be created from a closed channel"); + return Nan::ThrowError("Call cannot be created from a closed channel"); } - NanUtf8String method(args[1]); - double deadline = args[2]->NumberValue(); + Utf8String method(info[1]); + double deadline = Nan::To<double>(info[2]).FromJust(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call; - if (args[3]->IsString()) { - NanUtf8String host_override(args[3]); + if (info[3]->IsString()) { + Utf8String host_override(info[3]); wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, CompletionQueueAsyncWorker::GetQueue(), *method, *host_override, MillisecondsToTimespec(deadline), NULL); - } else if (args[3]->IsUndefined() || args[3]->IsNull()) { + } else if (info[3]->IsUndefined() || info[3]->IsNull()) { wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, CompletionQueueAsyncWorker::GetQueue(), *method, NULL, MillisecondsToTimespec(deadline), NULL); } else { - return NanThrowTypeError("Call's fourth argument must be a string"); + return Nan::ThrowTypeError("Call's fourth argument must be a string"); } call = new Call(wrapped_call); - args.This()->SetHiddenValue(NanNew("channel_"), channel_object); + info.This()->SetHiddenValue(Nan::New("channel_").ToLocalChecked(), + channel_object); } - call->Wrap(args.This()); - NanReturnValue(args.This()); + call->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); } else { const int argc = 4; - Local<Value> argv[argc] = {args[0], args[1], args[2], args[3]}; - NanReturnValue(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]}; + MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( + argc, argv); + if (maybe_instance.IsEmpty()) { + // There's probably a pending exception + return; + } else { + info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); + } } } NAN_METHOD(Call::StartBatch) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startBatch can only be called on Call objects"); + if (!Call::HasInstance(info.This())) { + return Nan::ThrowTypeError("startBatch can only be called on Call objects"); } - if (!args[0]->IsObject()) { - return NanThrowError("startBatch's first argument must be an object"); + if (!info[0]->IsObject()) { + return Nan::ThrowError("startBatch's first argument must be an object"); } - if (!args[1]->IsFunction()) { - return NanThrowError("startBatch's second argument must be a callback"); + if (!info[1]->IsFunction()) { + return Nan::ThrowError("startBatch's second argument must be a callback"); } - Handle<Function> callback_func = args[1].As<Function>(); - Call *call = ObjectWrap::Unwrap<Call>(args.This()); + Local<Function> callback_func = info[1].As<Function>(); + Call *call = ObjectWrap::Unwrap<Call>(info.This()); shared_ptr<Resources> resources(new Resources); - Handle<Object> obj = args[0]->ToObject(); - Handle<Array> keys = obj->GetOwnPropertyNames(); + Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked(); + Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked(); size_t nops = keys->Length(); vector<grpc_op> ops(nops); unique_ptr<OpVec> op_vector(new OpVec()); for (unsigned int i = 0; i < nops; i++) { unique_ptr<Op> op; - if (!keys->Get(i)->IsUint32()) { - return NanThrowError( + MaybeLocal<Value> maybe_key = Nan::Get(keys, i); + if (maybe_key.IsEmpty() || (!maybe_key.ToLocalChecked()->IsUint32())) { + return Nan::ThrowError( "startBatch's first argument's keys must be integers"); } - uint32_t type = keys->Get(i)->Uint32Value(); + uint32_t type = Nan::To<uint32_t>(maybe_key.ToLocalChecked()).FromJust(); ops[i].op = static_cast<grpc_op_type>(type); ops[i].flags = 0; ops[i].reserved = NULL; @@ -615,67 +664,64 @@ NAN_METHOD(Call::StartBatch) { op.reset(new ServerCloseResponseOp()); break; default: - return NanThrowError("Argument object had an unrecognized key"); + return Nan::ThrowError("Argument object had an unrecognized key"); } if (!op->ParseOp(obj->Get(type), &ops[i], resources)) { - return NanThrowTypeError("Incorrectly typed arguments to startBatch"); + return Nan::ThrowTypeError("Incorrectly typed arguments to startBatch"); } op_vector->push_back(std::move(op)); } - NanCallback *callback = new NanCallback(callback_func); + Callback *callback = new Callback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( callback, op_vector.release(), resources), NULL); if (error != GRPC_CALL_OK) { - return NanThrowError(nanErrorWithCode("startBatch failed", error)); + return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); } CompletionQueueAsyncWorker::Next(); - NanReturnUndefined(); } NAN_METHOD(Call::Cancel) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("cancel can only be called on Call objects"); + if (!Call::HasInstance(info.This())) { + return Nan::ThrowTypeError("cancel can only be called on Call objects"); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); + Call *call = ObjectWrap::Unwrap<Call>(info.This()); grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { - return NanThrowError(nanErrorWithCode("cancel failed", error)); + return Nan::ThrowError(nanErrorWithCode("cancel failed", error)); } - NanReturnUndefined(); } NAN_METHOD(Call::CancelWithStatus) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("cancel can only be called on Call objects"); + Nan::HandleScope scope; + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("cancel can only be called on Call objects"); } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( + if (!info[0]->IsUint32()) { + return Nan::ThrowTypeError( "cancelWithStatus's first argument must be a status code"); } - if (!args[1]->IsString()) { - return NanThrowTypeError( + if (!info[1]->IsString()) { + return Nan::ThrowTypeError( "cancelWithStatus's second argument must be a string"); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_status_code code = static_cast<grpc_status_code>(args[0]->Uint32Value()); - NanUtf8String details(args[0]); + Call *call = ObjectWrap::Unwrap<Call>(info.This()); + grpc_status_code code = static_cast<grpc_status_code>( + Nan::To<uint32_t>(info[0]).FromJust()); + Utf8String details(info[0]); grpc_call_cancel_with_status(call->wrapped_call, code, *details, NULL); - NanReturnUndefined(); } NAN_METHOD(Call::GetPeer) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("getPeer can only be called on Call objects"); + Nan::HandleScope scope; + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("getPeer can only be called on Call objects"); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); + Call *call = ObjectWrap::Unwrap<Call>(info.This()); char *peer = grpc_call_get_peer(call->wrapped_call); - Handle<Value> peer_value = NanNew(peer); + Local<Value> peer_value = Nan::New(peer).ToLocalChecked(); gpr_free(peer); - NanReturnValue(peer_value); + info.GetReturnValue().Set(peer_value); } } // namespace node diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 89f81dcf4d..2f8e1f17aa 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -51,6 +51,8 @@ namespace node { using std::unique_ptr; using std::shared_ptr; +typedef Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>> PersistentValue; + /** * Helper function for throwing errors with a grpc_call_error value. * Modified from the answer by Gus Goose to @@ -58,69 +60,54 @@ using std::shared_ptr; */ inline v8::Local<v8::Value> nanErrorWithCode(const char *msg, grpc_call_error code) { - NanEscapableScope(); - v8::Local<v8::Object> err = NanError(msg).As<v8::Object>(); - err->Set(NanNew("code"), NanNew<v8::Uint32>(code)); - return NanEscapeScope(err); + Nan::EscapableHandleScope scope; + v8::Local<v8::Object> err = Nan::Error(msg).As<v8::Object>(); + Nan::Set(err, Nan::New("code").ToLocalChecked(), Nan::New<v8::Uint32>(code)); + return scope.Escape(err); } -v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); - -class PersistentHolder { - public: - explicit PersistentHolder(v8::Persistent<v8::Value> *persist) : - persist(persist) { - } - - ~PersistentHolder() { - NanDisposePersistent(*persist); - delete persist; - } - - private: - v8::Persistent<v8::Value> *persist; -}; +v8::Local<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); struct Resources { - std::vector<unique_ptr<NanUtf8String> > strings; - std::vector<unique_ptr<PersistentHolder> > handles; + std::vector<unique_ptr<Nan::Utf8String> > strings; + std::vector<unique_ptr<PersistentValue> > handles; }; class Op { public: - virtual v8::Handle<v8::Value> GetNodeValue() const = 0; - virtual bool ParseOp(v8::Handle<v8::Value> value, grpc_op *out, + virtual v8::Local<v8::Value> GetNodeValue() const = 0; + virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out, shared_ptr<Resources> resources) = 0; - v8::Handle<v8::Value> GetOpType() const; + virtual ~Op(); + v8::Local<v8::Value> GetOpType() const; protected: virtual std::string GetTypeString() const = 0; }; typedef std::vector<unique_ptr<Op>> OpVec; - struct tag { - tag(NanCallback *callback, OpVec *ops, + tag(Nan::Callback *callback, OpVec *ops, shared_ptr<Resources> resources); ~tag(); - NanCallback *callback; + Nan::Callback *callback; OpVec *ops; shared_ptr<Resources> resources; }; -v8::Handle<v8::Value> GetTagNodeValue(void *tag); +v8::Local<v8::Value> GetTagNodeValue(void *tag); -NanCallback *GetTagCallback(void *tag); +Nan::Callback *GetTagCallback(void *tag); void DestroyTag(void *tag); /* Wrapper class for grpc_call structs. */ -class Call : public ::node::ObjectWrap { +class Call : public Nan::ObjectWrap { public: - static void Init(v8::Handle<v8::Object> exports); - static bool HasInstance(v8::Handle<v8::Value> val); + static void Init(v8::Local<v8::Object> exports); + static bool HasInstance(v8::Local<v8::Value> val); /* Wrap a grpc_call struct in a javascript object */ - static v8::Handle<v8::Value> WrapStruct(grpc_call *call); + static v8::Local<v8::Value> WrapStruct(grpc_call *call); private: explicit Call(grpc_call *call); @@ -135,9 +122,9 @@ class Call : public ::node::ObjectWrap { static NAN_METHOD(Cancel); static NAN_METHOD(CancelWithStatus); static NAN_METHOD(GetPeer); - static NanCallback *constructor; + static Nan::Callback *constructor; // Used for typechecking instances of this javascript class - static v8::Persistent<v8::FunctionTemplate> fun_tpl; + static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_call *wrapped_call; }; diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 9aed96bbf5..6eb1e77688 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -48,21 +48,27 @@ namespace grpc { namespace node { +using Nan::Callback; +using Nan::EscapableHandleScope; +using Nan::HandleScope; +using Nan::Maybe; +using Nan::MaybeLocal; +using Nan::ObjectWrap; +using Nan::Persistent; +using Nan::Utf8String; + using v8::Array; using v8::Exception; using v8::Function; using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Number; using v8::Object; -using v8::Persistent; using v8::String; using v8::Value; -NanCallback *Channel::constructor; +Callback *Channel::constructor; Persistent<FunctionTemplate> Channel::fun_tpl; Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {} @@ -73,88 +79,89 @@ Channel::~Channel() { } } -void Channel::Init(Handle<Object> exports) { - NanScope(); - Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New); - tpl->SetClassName(NanNew("Channel")); +void Channel::Init(Local<Object> exports) { + Nan::HandleScope scope; + Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); + tpl->SetClassName(Nan::New("Channel").ToLocalChecked()); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "close", - NanNew<FunctionTemplate>(Close)->GetFunction()); - NanSetPrototypeTemplate(tpl, "getTarget", - NanNew<FunctionTemplate>(GetTarget)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "getConnectivityState", - NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "watchConnectivityState", - NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction()); - NanAssignPersistent(fun_tpl, tpl); - Handle<Function> ctr = tpl->GetFunction(); - constructor = new NanCallback(ctr); - exports->Set(NanNew("Channel"), ctr); + Nan::SetPrototypeMethod(tpl, "close", Close); + Nan::SetPrototypeMethod(tpl, "getTarget", GetTarget); + Nan::SetPrototypeMethod(tpl, "getConnectivityState", GetConnectivityState); + Nan::SetPrototypeMethod(tpl, "watchConnectivityState", + WatchConnectivityState); + fun_tpl.Reset(tpl); + Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); + Nan::Set(exports, Nan::New("Channel").ToLocalChecked(), ctr); + constructor = new Callback(ctr); } -bool Channel::HasInstance(Handle<Value> val) { - NanScope(); - return NanHasInstance(fun_tpl, val); +bool Channel::HasInstance(Local<Value> val) { + HandleScope scope; + return Nan::New(fun_tpl)->HasInstance(val); } grpc_channel *Channel::GetWrappedChannel() { return this->wrapped_channel; } NAN_METHOD(Channel::New) { - NanScope(); - - if (args.IsConstructCall()) { - if (!args[0]->IsString()) { - return NanThrowTypeError( + if (info.IsConstructCall()) { + if (!info[0]->IsString()) { + return Nan::ThrowTypeError( "Channel expects a string, a credential and an object"); } grpc_channel *wrapped_channel; // Owned by the Channel object - NanUtf8String host(args[0]); + Utf8String host(info[0]); grpc_credentials *creds; - if (!Credentials::HasInstance(args[1])) { - return NanThrowTypeError( + if (!Credentials::HasInstance(info[1])) { + return Nan::ThrowTypeError( "Channel's second argument must be a credential"); } Credentials *creds_object = ObjectWrap::Unwrap<Credentials>( - args[1]->ToObject()); + Nan::To<Object>(info[1]).ToLocalChecked()); creds = creds_object->GetWrappedCredentials(); grpc_channel_args *channel_args_ptr; - if (args[2]->IsUndefined()) { + if (info[2]->IsUndefined()) { channel_args_ptr = NULL; wrapped_channel = grpc_insecure_channel_create(*host, NULL, NULL); - } else if (args[2]->IsObject()) { - Handle<Object> args_hash(args[2]->ToObject()->Clone()); - Handle<Array> keys(args_hash->GetOwnPropertyNames()); + } else if (info[2]->IsObject()) { + Local<Object> args_hash = Nan::To<Object>(info[2]).ToLocalChecked(); + Local<Array> keys(Nan::GetOwnPropertyNames(args_hash).ToLocalChecked()); grpc_channel_args channel_args; channel_args.num_args = keys->Length(); channel_args.args = reinterpret_cast<grpc_arg *>( calloc(channel_args.num_args, sizeof(grpc_arg))); /* These are used to keep all strings until then end of the block, then destroy them */ - std::vector<NanUtf8String *> key_strings(keys->Length()); - std::vector<NanUtf8String *> value_strings(keys->Length()); + std::vector<Nan::Utf8String *> key_strings(keys->Length()); + std::vector<Nan::Utf8String *> value_strings(keys->Length()); for (unsigned int i = 0; i < channel_args.num_args; i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - Handle<Value> current_value(args_hash->Get(current_key)); - key_strings[i] = new NanUtf8String(current_key); + MaybeLocal<String> maybe_key = Nan::To<String>( + Nan::Get(keys, i).ToLocalChecked()); + if (maybe_key.IsEmpty()) { + free(channel_args.args); + return Nan::ThrowTypeError("Arg keys must be strings"); + } + Local<String> current_key = maybe_key.ToLocalChecked(); + Local<Value> current_value = Nan::Get(args_hash, + current_key).ToLocalChecked(); + key_strings[i] = new Nan::Utf8String(current_key); channel_args.args[i].key = **key_strings[i]; if (current_value->IsInt32()) { channel_args.args[i].type = GRPC_ARG_INTEGER; - channel_args.args[i].value.integer = current_value->Int32Value(); + channel_args.args[i].value.integer = Nan::To<int32_t>( + current_value).FromJust(); } else if (current_value->IsString()) { channel_args.args[i].type = GRPC_ARG_STRING; - value_strings[i] = new NanUtf8String(current_value); + value_strings[i] = new Nan::Utf8String(current_value); channel_args.args[i].value.string = **value_strings[i]; } else { free(channel_args.args); - return NanThrowTypeError("Arg values must be strings"); + return Nan::ThrowTypeError("Arg values must be strings"); } } channel_args_ptr = &channel_args; } else { - return NanThrowTypeError("Channel expects a string and an object"); + return Nan::ThrowTypeError("Channel expects a string and an object"); } if (creds == NULL) { wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr, @@ -167,73 +174,79 @@ NAN_METHOD(Channel::New) { free(channel_args_ptr->args); } Channel *channel = new Channel(wrapped_channel); - channel->Wrap(args.This()); - NanReturnValue(args.This()); + channel->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); + return; } else { const int argc = 3; - Local<Value> argv[argc] = {args[0], args[1], args[2]}; - NanReturnValue(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {info[0], info[1], info[2]}; + MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( + argc, argv); + if (maybe_instance.IsEmpty()) { + // There's probably a pending exception + return; + } else { + info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); + } } } NAN_METHOD(Channel::Close) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("close can only be called on Channel objects"); + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("close can only be called on Channel objects"); } - Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + Channel *channel = ObjectWrap::Unwrap<Channel>(info.This()); if (channel->wrapped_channel != NULL) { grpc_channel_destroy(channel->wrapped_channel); channel->wrapped_channel = NULL; } - NanReturnUndefined(); } NAN_METHOD(Channel::GetTarget) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("getTarget can only be called on Channel objects"); + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("getTarget can only be called on Channel objects"); } - Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); - NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel))); + Channel *channel = ObjectWrap::Unwrap<Channel>(info.This()); + info.GetReturnValue().Set(Nan::New( + grpc_channel_get_target(channel->wrapped_channel)).ToLocalChecked()); } NAN_METHOD(Channel::GetConnectivityState) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError( "getConnectivityState can only be called on Channel objects"); } - Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); - int try_to_connect = (int)args[0]->Equals(NanTrue()); - NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel, - try_to_connect)); + Channel *channel = ObjectWrap::Unwrap<Channel>(info.This()); + int try_to_connect = (int)info[0]->Equals(Nan::True()); + info.GetReturnValue().Set( + grpc_channel_check_connectivity_state(channel->wrapped_channel, + try_to_connect)); } NAN_METHOD(Channel::WatchConnectivityState) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError( "watchConnectivityState can only be called on Channel objects"); } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( + if (!info[0]->IsUint32()) { + return Nan::ThrowTypeError( "watchConnectivityState's first argument must be a channel state"); } - if (!(args[1]->IsNumber() || args[1]->IsDate())) { - return NanThrowTypeError( + if (!(info[1]->IsNumber() || info[1]->IsDate())) { + return Nan::ThrowTypeError( "watchConnectivityState's second argument must be a date or a number"); } - if (!args[2]->IsFunction()) { - return NanThrowTypeError( + if (!info[2]->IsFunction()) { + return Nan::ThrowTypeError( "watchConnectivityState's third argument must be a callback"); } grpc_connectivity_state last_state = - static_cast<grpc_connectivity_state>(args[0]->Uint32Value()); - double deadline = args[1]->NumberValue(); - Handle<Function> callback_func = args[2].As<Function>(); - NanCallback *callback = new NanCallback(callback_func); - Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + static_cast<grpc_connectivity_state>( + Nan::To<uint32_t>(info[0]).FromJust()); + double deadline = Nan::To<double>(info[1]).FromJust(); + Local<Function> callback_func = info[2].As<Function>(); + Nan::Callback *callback = new Callback(callback_func); + Channel *channel = ObjectWrap::Unwrap<Channel>(info.This()); unique_ptr<OpVec> ops(new OpVec()); grpc_channel_watch_connectivity_state( channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), @@ -242,7 +255,6 @@ NAN_METHOD(Channel::WatchConnectivityState) { ops.release(), shared_ptr<Resources>(nullptr))); CompletionQueueAsyncWorker::Next(); - NanReturnUndefined(); } } // namespace node diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h index 458f71d093..0062fd03f4 100644 --- a/src/node/ext/channel.h +++ b/src/node/ext/channel.h @@ -42,10 +42,10 @@ namespace grpc { namespace node { /* Wrapper class for grpc_channel structs */ -class Channel : public ::node::ObjectWrap { +class Channel : public Nan::ObjectWrap { public: - static void Init(v8::Handle<v8::Object> exports); - static bool HasInstance(v8::Handle<v8::Value> val); + static void Init(v8::Local<v8::Object> exports); + static bool HasInstance(v8::Local<v8::Value> val); /* This is used to typecheck javascript objects before converting them to this type */ static v8::Persistent<v8::Value> prototype; @@ -66,8 +66,8 @@ class Channel : public ::node::ObjectWrap { static NAN_METHOD(GetTarget); static NAN_METHOD(GetConnectivityState); static NAN_METHOD(WatchConnectivityState); - static NanCallback *constructor; - static v8::Persistent<v8::FunctionTemplate> fun_tpl; + static Nan::Callback *constructor; + static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_channel *wrapped_channel; }; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index bf2cd946a5..3a79f7c45d 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -46,9 +46,8 @@ namespace node { const int max_queue_threads = 2; using v8::Function; -using v8::Handle; +using v8::Local; using v8::Object; -using v8::Persistent; using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; @@ -57,7 +56,7 @@ int CompletionQueueAsyncWorker::current_threads; int CompletionQueueAsyncWorker::waiting_next_calls; CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() - : NanAsyncWorker(NULL) {} + : Nan::AsyncWorker(NULL) {} CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} @@ -72,42 +71,42 @@ void CompletionQueueAsyncWorker::Execute() { grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { - NanScope(); + Nan::HandleScope scope; if (current_threads < max_queue_threads) { CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + Nan::AsyncQueueWorker(worker); } else { waiting_next_calls += 1; } } -void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { - NanScope(); +void CompletionQueueAsyncWorker::Init(Local<Object> exports) { + Nan::HandleScope scope; current_threads = 0; waiting_next_calls = 0; queue = grpc_completion_queue_create(NULL); } void CompletionQueueAsyncWorker::HandleOKCallback() { - NanScope(); + Nan::HandleScope scope; if (waiting_next_calls > 0) { waiting_next_calls -= 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + Nan::AsyncQueueWorker(worker); } else { current_threads -= 1; } - NanCallback *callback = GetTagCallback(result.tag); - Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)}; + Nan::Callback *callback = GetTagCallback(result.tag); + Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); DestroyTag(result.tag); } void CompletionQueueAsyncWorker::HandleErrorCallback() { - NanScope(); - NanCallback *callback = GetTagCallback(result.tag); - Handle<Value> argv[] = {NanError(ErrorMessage())}; + Nan::HandleScope scope; + Nan::Callback *callback = GetTagCallback(result.tag); + Local<Value> argv[] = {Nan::Error(ErrorMessage())}; callback->Call(1, argv); diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 27fedf2fce..6e54116765 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -42,7 +42,7 @@ namespace node { /* A worker that asynchronously calls completion_queue_next, and queues onto the node event loop a call to the function stored in the event's tag. */ -class CompletionQueueAsyncWorker : public NanAsyncWorker { +class CompletionQueueAsyncWorker : public Nan::AsyncWorker { public: CompletionQueueAsyncWorker(); @@ -59,7 +59,7 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { static void Next(); /* Initialize the CompletionQueueAsyncWorker class */ - static void Init(v8::Handle<v8::Object> exports); + static void Init(v8::Local<v8::Object> exports); protected: /* Called when Execute has succeeded (completed without setting an error diff --git a/src/node/ext/credentials.cc b/src/node/ext/credentials.cc index c3b04dcea7..4f41c92f6a 100644 --- a/src/node/ext/credentials.cc +++ b/src/node/ext/credentials.cc @@ -41,20 +41,26 @@ namespace grpc { namespace node { +using Nan::Callback; +using Nan::EscapableHandleScope; +using Nan::HandleScope; +using Nan::Maybe; +using Nan::MaybeLocal; +using Nan::ObjectWrap; +using Nan::Persistent; +using Nan::Utf8String; + using v8::Exception; using v8::External; using v8::Function; using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Object; using v8::ObjectTemplate; -using v8::Persistent; using v8::Value; -NanCallback *Credentials::constructor; +Nan::Callback *Credentials::constructor; Persistent<FunctionTemplate> Credentials::fun_tpl; Credentials::Credentials(grpc_credentials *credentials) @@ -64,40 +70,52 @@ Credentials::~Credentials() { grpc_credentials_release(wrapped_credentials); } -void Credentials::Init(Handle<Object> exports) { - NanScope(); - Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New); - tpl->SetClassName(NanNew("Credentials")); +void Credentials::Init(Local<Object> exports) { + HandleScope scope; + Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); + tpl->SetClassName(Nan::New("Credentials").ToLocalChecked()); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanAssignPersistent(fun_tpl, tpl); - Handle<Function> ctr = tpl->GetFunction(); - ctr->Set(NanNew("createDefault"), - NanNew<FunctionTemplate>(CreateDefault)->GetFunction()); - ctr->Set(NanNew("createSsl"), - NanNew<FunctionTemplate>(CreateSsl)->GetFunction()); - ctr->Set(NanNew("createComposite"), - NanNew<FunctionTemplate>(CreateComposite)->GetFunction()); - ctr->Set(NanNew("createGce"), - NanNew<FunctionTemplate>(CreateGce)->GetFunction()); - ctr->Set(NanNew("createIam"), - NanNew<FunctionTemplate>(CreateIam)->GetFunction()); - ctr->Set(NanNew("createInsecure"), - NanNew<FunctionTemplate>(CreateInsecure)->GetFunction()); - constructor = new NanCallback(ctr); - exports->Set(NanNew("Credentials"), ctr); + fun_tpl.Reset(tpl); + Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); + Nan::Set(ctr, Nan::New("createDefault").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateDefault)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createSsl").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateSsl)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createComposite").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateComposite)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createGce").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateGce)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createIam").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateIam)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createInsecure").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateInsecure)).ToLocalChecked()); + Nan::Set(exports, Nan::New("Credentials").ToLocalChecked(), ctr); + constructor = new Nan::Callback(ctr); } -bool Credentials::HasInstance(Handle<Value> val) { - NanScope(); - return NanHasInstance(fun_tpl, val); +bool Credentials::HasInstance(Local<Value> val) { + HandleScope scope; + return Nan::New(fun_tpl)->HasInstance(val); } -Handle<Value> Credentials::WrapStruct(grpc_credentials *credentials) { - NanEscapableScope(); +Local<Value> Credentials::WrapStruct(grpc_credentials *credentials) { + EscapableHandleScope scope; const int argc = 1; - Handle<Value> argv[argc] = { - NanNew<External>(reinterpret_cast<void *>(credentials))}; - return NanEscapeScope(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = { + Nan::New<External>(reinterpret_cast<void *>(credentials))}; + MaybeLocal<Object> maybe_instance = Nan::NewInstance( + constructor->GetFunction(), argc, argv); + if (maybe_instance.IsEmpty()) { + return scope.Escape(Nan::Null()); + } else { + return scope.Escape(maybe_instance.ToLocalChecked()); + } } grpc_credentials *Credentials::GetWrappedCredentials() { @@ -105,115 +123,123 @@ grpc_credentials *Credentials::GetWrappedCredentials() { } NAN_METHOD(Credentials::New) { - NanScope(); - - if (args.IsConstructCall()) { - if (!args[0]->IsExternal()) { - return NanThrowTypeError( + if (info.IsConstructCall()) { + if (!info[0]->IsExternal()) { + return Nan::ThrowTypeError( "Credentials can only be created with the provided functions"); } - Handle<External> ext = args[0].As<External>(); + Local<External> ext = info[0].As<External>(); grpc_credentials *creds_value = reinterpret_cast<grpc_credentials *>(ext->Value()); Credentials *credentials = new Credentials(creds_value); - credentials->Wrap(args.This()); - NanReturnValue(args.This()); + credentials->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); + return; } else { const int argc = 1; - Local<Value> argv[argc] = {args[0]}; - NanReturnValue(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {info[0]}; + MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( + argc, argv); + if (maybe_instance.IsEmpty()) { + // There's probably a pending exception + return; + } else { + info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); + } } } NAN_METHOD(Credentials::CreateDefault) { - NanScope(); grpc_credentials *creds = grpc_google_default_credentials_create(); if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(Credentials::CreateSsl) { - NanScope(); char *root_certs = NULL; grpc_ssl_pem_key_cert_pair key_cert_pair = {NULL, NULL}; - if (::node::Buffer::HasInstance(args[0])) { - root_certs = ::node::Buffer::Data(args[0]); - } else if (!(args[0]->IsNull() || args[0]->IsUndefined())) { - return NanThrowTypeError("createSsl's first argument must be a Buffer"); + if (::node::Buffer::HasInstance(info[0])) { + root_certs = ::node::Buffer::Data(info[0]); + } else if (!(info[0]->IsNull() || info[0]->IsUndefined())) { + return Nan::ThrowTypeError("createSsl's first argument must be a Buffer"); } - if (::node::Buffer::HasInstance(args[1])) { - key_cert_pair.private_key = ::node::Buffer::Data(args[1]); - } else if (!(args[1]->IsNull() || args[1]->IsUndefined())) { - return NanThrowTypeError( + if (::node::Buffer::HasInstance(info[1])) { + key_cert_pair.private_key = ::node::Buffer::Data(info[1]); + } else if (!(info[1]->IsNull() || info[1]->IsUndefined())) { + return Nan::ThrowTypeError( "createSSl's second argument must be a Buffer if provided"); } - if (::node::Buffer::HasInstance(args[2])) { - key_cert_pair.cert_chain = ::node::Buffer::Data(args[2]); - } else if (!(args[2]->IsNull() || args[2]->IsUndefined())) { - return NanThrowTypeError( + if (::node::Buffer::HasInstance(info[2])) { + key_cert_pair.cert_chain = ::node::Buffer::Data(info[2]); + } else if (!(info[2]->IsNull() || info[2]->IsUndefined())) { + return Nan::ThrowTypeError( "createSSl's third argument must be a Buffer if provided"); } grpc_credentials *creds = grpc_ssl_credentials_create( root_certs, key_cert_pair.private_key == NULL ? NULL : &key_cert_pair, NULL); if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(Credentials::CreateComposite) { - NanScope(); - if (!HasInstance(args[0])) { - return NanThrowTypeError( + if (!HasInstance(info[0])) { + return Nan::ThrowTypeError( "createComposite's first argument must be a Credentials object"); } - if (!HasInstance(args[1])) { - return NanThrowTypeError( + if (!HasInstance(info[1])) { + return Nan::ThrowTypeError( "createComposite's second argument must be a Credentials object"); } - Credentials *creds1 = ObjectWrap::Unwrap<Credentials>(args[0]->ToObject()); - Credentials *creds2 = ObjectWrap::Unwrap<Credentials>(args[1]->ToObject()); + Credentials *creds1 = ObjectWrap::Unwrap<Credentials>( + Nan::To<Object>(info[0]).ToLocalChecked()); + Credentials *creds2 = ObjectWrap::Unwrap<Credentials>( + Nan::To<Object>(info[1]).ToLocalChecked()); grpc_credentials *creds = grpc_composite_credentials_create( creds1->wrapped_credentials, creds2->wrapped_credentials, NULL); if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(Credentials::CreateGce) { - NanScope(); + Nan::HandleScope scope; grpc_credentials *creds = grpc_google_compute_engine_credentials_create(NULL); if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(Credentials::CreateIam) { - NanScope(); - if (!args[0]->IsString()) { - return NanThrowTypeError("createIam's first argument must be a string"); + if (!info[0]->IsString()) { + return Nan::ThrowTypeError("createIam's first argument must be a string"); } - if (!args[1]->IsString()) { - return NanThrowTypeError("createIam's second argument must be a string"); + if (!info[1]->IsString()) { + return Nan::ThrowTypeError("createIam's second argument must be a string"); } - NanUtf8String auth_token(args[0]); - NanUtf8String auth_selector(args[1]); + Utf8String auth_token(info[0]); + Utf8String auth_selector(info[1]); grpc_credentials *creds = grpc_google_iam_credentials_create(*auth_token, *auth_selector, NULL); if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(Credentials::CreateInsecure) { - NanScope(); - NanReturnValue(WrapStruct(NULL)); + info.GetReturnValue().Set(WrapStruct(NULL)); } } // namespace node diff --git a/src/node/ext/credentials.h b/src/node/ext/credentials.h index 62957e61c3..1b211175d4 100644 --- a/src/node/ext/credentials.h +++ b/src/node/ext/credentials.h @@ -43,12 +43,12 @@ namespace grpc { namespace node { /* Wrapper class for grpc_credentials structs */ -class Credentials : public ::node::ObjectWrap { +class Credentials : public Nan::ObjectWrap { public: - static void Init(v8::Handle<v8::Object> exports); - static bool HasInstance(v8::Handle<v8::Value> val); + static void Init(v8::Local<v8::Object> exports); + static bool HasInstance(v8::Local<v8::Value> val); /* Wrap a grpc_credentials struct in a javascript object */ - static v8::Handle<v8::Value> WrapStruct(grpc_credentials *credentials); + static v8::Local<v8::Value> WrapStruct(grpc_credentials *credentials); /* Returns the grpc_credentials struct that this object wraps */ grpc_credentials *GetWrappedCredentials(); @@ -69,9 +69,9 @@ class Credentials : public ::node::ObjectWrap { static NAN_METHOD(CreateFake); static NAN_METHOD(CreateIam); static NAN_METHOD(CreateInsecure); - static NanCallback *constructor; + static Nan::Callback *constructor; // Used for typechecking instances of this javascript class - static v8::Persistent<v8::FunctionTemplate> fun_tpl; + static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_credentials *wrapped_credentials; }; diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 0cf30da922..caca0fc452 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -43,171 +43,194 @@ #include "credentials.h" #include "server_credentials.h" -using v8::Handle; +using v8::Local; using v8::Value; using v8::Object; using v8::Uint32; using v8::String; -void InitStatusConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> status = NanNew<Object>(); - exports->Set(NanNew("status"), status); - Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_STATUS_OK)); - status->Set(NanNew("OK"), OK); - Handle<Value> CANCELLED(NanNew<Uint32, uint32_t>(GRPC_STATUS_CANCELLED)); - status->Set(NanNew("CANCELLED"), CANCELLED); - Handle<Value> UNKNOWN(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNKNOWN)); - status->Set(NanNew("UNKNOWN"), UNKNOWN); - Handle<Value> INVALID_ARGUMENT( - NanNew<Uint32, uint32_t>(GRPC_STATUS_INVALID_ARGUMENT)); - status->Set(NanNew("INVALID_ARGUMENT"), INVALID_ARGUMENT); - Handle<Value> DEADLINE_EXCEEDED( - NanNew<Uint32, uint32_t>(GRPC_STATUS_DEADLINE_EXCEEDED)); - status->Set(NanNew("DEADLINE_EXCEEDED"), DEADLINE_EXCEEDED); - Handle<Value> NOT_FOUND(NanNew<Uint32, uint32_t>(GRPC_STATUS_NOT_FOUND)); - status->Set(NanNew("NOT_FOUND"), NOT_FOUND); - Handle<Value> ALREADY_EXISTS( - NanNew<Uint32, uint32_t>(GRPC_STATUS_ALREADY_EXISTS)); - status->Set(NanNew("ALREADY_EXISTS"), ALREADY_EXISTS); - Handle<Value> PERMISSION_DENIED( - NanNew<Uint32, uint32_t>(GRPC_STATUS_PERMISSION_DENIED)); - status->Set(NanNew("PERMISSION_DENIED"), PERMISSION_DENIED); - Handle<Value> UNAUTHENTICATED( - NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAUTHENTICATED)); - status->Set(NanNew("UNAUTHENTICATED"), UNAUTHENTICATED); - Handle<Value> RESOURCE_EXHAUSTED( - NanNew<Uint32, uint32_t>(GRPC_STATUS_RESOURCE_EXHAUSTED)); - status->Set(NanNew("RESOURCE_EXHAUSTED"), RESOURCE_EXHAUSTED); - Handle<Value> FAILED_PRECONDITION( - NanNew<Uint32, uint32_t>(GRPC_STATUS_FAILED_PRECONDITION)); - status->Set(NanNew("FAILED_PRECONDITION"), FAILED_PRECONDITION); - Handle<Value> ABORTED(NanNew<Uint32, uint32_t>(GRPC_STATUS_ABORTED)); - status->Set(NanNew("ABORTED"), ABORTED); - Handle<Value> OUT_OF_RANGE( - NanNew<Uint32, uint32_t>(GRPC_STATUS_OUT_OF_RANGE)); - status->Set(NanNew("OUT_OF_RANGE"), OUT_OF_RANGE); - Handle<Value> UNIMPLEMENTED( - NanNew<Uint32, uint32_t>(GRPC_STATUS_UNIMPLEMENTED)); - status->Set(NanNew("UNIMPLEMENTED"), UNIMPLEMENTED); - Handle<Value> INTERNAL(NanNew<Uint32, uint32_t>(GRPC_STATUS_INTERNAL)); - status->Set(NanNew("INTERNAL"), INTERNAL); - Handle<Value> UNAVAILABLE(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAVAILABLE)); - status->Set(NanNew("UNAVAILABLE"), UNAVAILABLE); - Handle<Value> DATA_LOSS(NanNew<Uint32, uint32_t>(GRPC_STATUS_DATA_LOSS)); - status->Set(NanNew("DATA_LOSS"), DATA_LOSS); +void InitStatusConstants(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> status = Nan::New<Object>(); + Nan::Set(exports, Nan::New("status").ToLocalChecked(), status); + Local<Value> OK(Nan::New<Uint32, uint32_t>(GRPC_STATUS_OK)); + Nan::Set(status, Nan::New("OK").ToLocalChecked(), OK); + Local<Value> CANCELLED(Nan::New<Uint32, uint32_t>(GRPC_STATUS_CANCELLED)); + Nan::Set(status, Nan::New("CANCELLED").ToLocalChecked(), CANCELLED); + Local<Value> UNKNOWN(Nan::New<Uint32, uint32_t>(GRPC_STATUS_UNKNOWN)); + Nan::Set(status, Nan::New("UNKNOWN").ToLocalChecked(), UNKNOWN); + Local<Value> INVALID_ARGUMENT( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_INVALID_ARGUMENT)); + Nan::Set(status, Nan::New("INVALID_ARGUMENT").ToLocalChecked(), + INVALID_ARGUMENT); + Local<Value> DEADLINE_EXCEEDED( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_DEADLINE_EXCEEDED)); + Nan::Set(status, Nan::New("DEADLINE_EXCEEDED").ToLocalChecked(), + DEADLINE_EXCEEDED); + Local<Value> NOT_FOUND(Nan::New<Uint32, uint32_t>(GRPC_STATUS_NOT_FOUND)); + Nan::Set(status, Nan::New("NOT_FOUND").ToLocalChecked(), NOT_FOUND); + Local<Value> ALREADY_EXISTS( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_ALREADY_EXISTS)); + Nan::Set(status, Nan::New("ALREADY_EXISTS").ToLocalChecked(), ALREADY_EXISTS); + Local<Value> PERMISSION_DENIED( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_PERMISSION_DENIED)); + Nan::Set(status, Nan::New("PERMISSION_DENIED").ToLocalChecked(), + PERMISSION_DENIED); + Local<Value> UNAUTHENTICATED( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_UNAUTHENTICATED)); + Nan::Set(status, Nan::New("UNAUTHENTICATED").ToLocalChecked(), + UNAUTHENTICATED); + Local<Value> RESOURCE_EXHAUSTED( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_RESOURCE_EXHAUSTED)); + Nan::Set(status, Nan::New("RESOURCE_EXHAUSTED").ToLocalChecked(), + RESOURCE_EXHAUSTED); + Local<Value> FAILED_PRECONDITION( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_FAILED_PRECONDITION)); + Nan::Set(status, Nan::New("FAILED_PRECONDITION").ToLocalChecked(), + FAILED_PRECONDITION); + Local<Value> ABORTED(Nan::New<Uint32, uint32_t>(GRPC_STATUS_ABORTED)); + Nan::Set(status, Nan::New("ABORTED").ToLocalChecked(), ABORTED); + Local<Value> OUT_OF_RANGE( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_OUT_OF_RANGE)); + Nan::Set(status, Nan::New("OUT_OF_RANGE").ToLocalChecked(), OUT_OF_RANGE); + Local<Value> UNIMPLEMENTED( + Nan::New<Uint32, uint32_t>(GRPC_STATUS_UNIMPLEMENTED)); + Nan::Set(status, Nan::New("UNIMPLEMENTED").ToLocalChecked(), UNIMPLEMENTED); + Local<Value> INTERNAL(Nan::New<Uint32, uint32_t>(GRPC_STATUS_INTERNAL)); + Nan::Set(status, Nan::New("INTERNAL").ToLocalChecked(), INTERNAL); + Local<Value> UNAVAILABLE(Nan::New<Uint32, uint32_t>(GRPC_STATUS_UNAVAILABLE)); + Nan::Set(status, Nan::New("UNAVAILABLE").ToLocalChecked(), UNAVAILABLE); + Local<Value> DATA_LOSS(Nan::New<Uint32, uint32_t>(GRPC_STATUS_DATA_LOSS)); + Nan::Set(status, Nan::New("DATA_LOSS").ToLocalChecked(), DATA_LOSS); } -void InitCallErrorConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> call_error = NanNew<Object>(); - exports->Set(NanNew("callError"), call_error); - Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_CALL_OK)); - call_error->Set(NanNew("OK"), OK); - Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR)); - call_error->Set(NanNew("ERROR"), ERROR); - Handle<Value> NOT_ON_SERVER( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_SERVER)); - call_error->Set(NanNew("NOT_ON_SERVER"), NOT_ON_SERVER); - Handle<Value> NOT_ON_CLIENT( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_CLIENT)); - call_error->Set(NanNew("NOT_ON_CLIENT"), NOT_ON_CLIENT); - Handle<Value> ALREADY_INVOKED( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_INVOKED)); - call_error->Set(NanNew("ALREADY_INVOKED"), ALREADY_INVOKED); - Handle<Value> NOT_INVOKED( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_INVOKED)); - call_error->Set(NanNew("NOT_INVOKED"), NOT_INVOKED); - Handle<Value> ALREADY_FINISHED( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_FINISHED)); - call_error->Set(NanNew("ALREADY_FINISHED"), ALREADY_FINISHED); - Handle<Value> TOO_MANY_OPERATIONS( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS)); - call_error->Set(NanNew("TOO_MANY_OPERATIONS"), TOO_MANY_OPERATIONS); - Handle<Value> INVALID_FLAGS( - NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_INVALID_FLAGS)); - call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS); +void InitCallErrorConstants(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> call_error = Nan::New<Object>(); + Nan::Set(exports, Nan::New("callError").ToLocalChecked(), call_error); + Local<Value> OK(Nan::New<Uint32, uint32_t>(GRPC_CALL_OK)); + Nan::Set(call_error, Nan::New("OK").ToLocalChecked(), OK); + Local<Value> ERROR(Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR)); + Nan::Set(call_error, Nan::New("ERROR").ToLocalChecked(), ERROR); + Local<Value> NOT_ON_SERVER( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_SERVER)); + Nan::Set(call_error, Nan::New("NOT_ON_SERVER").ToLocalChecked(), + NOT_ON_SERVER); + Local<Value> NOT_ON_CLIENT( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_CLIENT)); + Nan::Set(call_error, Nan::New("NOT_ON_CLIENT").ToLocalChecked(), + NOT_ON_CLIENT); + Local<Value> ALREADY_INVOKED( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_INVOKED)); + Nan::Set(call_error, Nan::New("ALREADY_INVOKED").ToLocalChecked(), + ALREADY_INVOKED); + Local<Value> NOT_INVOKED( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_INVOKED)); + Nan::Set(call_error, Nan::New("NOT_INVOKED").ToLocalChecked(), NOT_INVOKED); + Local<Value> ALREADY_FINISHED( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_FINISHED)); + Nan::Set(call_error, Nan::New("ALREADY_FINISHED").ToLocalChecked(), + ALREADY_FINISHED); + Local<Value> TOO_MANY_OPERATIONS( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS)); + Nan::Set(call_error, Nan::New("TOO_MANY_OPERATIONS").ToLocalChecked(), + TOO_MANY_OPERATIONS); + Local<Value> INVALID_FLAGS( + Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_INVALID_FLAGS)); + Nan::Set(call_error, Nan::New("INVALID_FLAGS").ToLocalChecked(), + INVALID_FLAGS); } -void InitOpTypeConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> op_type = NanNew<Object>(); - exports->Set(NanNew("opType"), op_type); - Handle<Value> SEND_INITIAL_METADATA( - NanNew<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA)); - op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA); - Handle<Value> SEND_MESSAGE( - NanNew<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE)); - op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE); - Handle<Value> SEND_CLOSE_FROM_CLIENT( - NanNew<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); - op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT); - Handle<Value> SEND_STATUS_FROM_SERVER( - NanNew<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER)); - op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER); - Handle<Value> RECV_INITIAL_METADATA( - NanNew<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA)); - op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA); - Handle<Value> RECV_MESSAGE( - NanNew<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE)); - op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE); - Handle<Value> RECV_STATUS_ON_CLIENT( - NanNew<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT)); - op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT); - Handle<Value> RECV_CLOSE_ON_SERVER( - NanNew<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER)); - op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); +void InitOpTypeConstants(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> op_type = Nan::New<Object>(); + Nan::Set(exports, Nan::New("opType").ToLocalChecked(), op_type); + Local<Value> SEND_INITIAL_METADATA( + Nan::New<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA)); + Nan::Set(op_type, Nan::New("SEND_INITIAL_METADATA").ToLocalChecked(), + SEND_INITIAL_METADATA); + Local<Value> SEND_MESSAGE( + Nan::New<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE)); + Nan::Set(op_type, Nan::New("SEND_MESSAGE").ToLocalChecked(), SEND_MESSAGE); + Local<Value> SEND_CLOSE_FROM_CLIENT( + Nan::New<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + Nan::Set(op_type, Nan::New("SEND_CLOSE_FROM_CLIENT").ToLocalChecked(), + SEND_CLOSE_FROM_CLIENT); + Local<Value> SEND_STATUS_FROM_SERVER( + Nan::New<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER)); + Nan::Set(op_type, Nan::New("SEND_STATUS_FROM_SERVER").ToLocalChecked(), + SEND_STATUS_FROM_SERVER); + Local<Value> RECV_INITIAL_METADATA( + Nan::New<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA)); + Nan::Set(op_type, Nan::New("RECV_INITIAL_METADATA").ToLocalChecked(), + RECV_INITIAL_METADATA); + Local<Value> RECV_MESSAGE( + Nan::New<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE)); + Nan::Set(op_type, Nan::New("RECV_MESSAGE").ToLocalChecked(), RECV_MESSAGE); + Local<Value> RECV_STATUS_ON_CLIENT( + Nan::New<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT)); + Nan::Set(op_type, Nan::New("RECV_STATUS_ON_CLIENT").ToLocalChecked(), + RECV_STATUS_ON_CLIENT); + Local<Value> RECV_CLOSE_ON_SERVER( + Nan::New<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER)); + Nan::Set(op_type, Nan::New("RECV_CLOSE_ON_SERVER").ToLocalChecked(), + RECV_CLOSE_ON_SERVER); } -void InitPropagateConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> propagate = NanNew<Object>(); - exports->Set(NanNew("propagate"), propagate); - Handle<Value> DEADLINE(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEADLINE)); - propagate->Set(NanNew("DEADLINE"), DEADLINE); - Handle<Value> CENSUS_STATS_CONTEXT( - NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)); - propagate->Set(NanNew("CENSUS_STATS_CONTEXT"), CENSUS_STATS_CONTEXT); - Handle<Value> CENSUS_TRACING_CONTEXT( - NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT)); - propagate->Set(NanNew("CENSUS_TRACING_CONTEXT"), CENSUS_TRACING_CONTEXT); - Handle<Value> CANCELLATION( - NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CANCELLATION)); - propagate->Set(NanNew("CANCELLATION"), CANCELLATION); - Handle<Value> DEFAULTS(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEFAULTS)); - propagate->Set(NanNew("DEFAULTS"), DEFAULTS); +void InitPropagateConstants(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> propagate = Nan::New<Object>(); + Nan::Set(exports, Nan::New("propagate").ToLocalChecked(), propagate); + Local<Value> DEADLINE(Nan::New<Uint32, uint32_t>(GRPC_PROPAGATE_DEADLINE)); + Nan::Set(propagate, Nan::New("DEADLINE").ToLocalChecked(), DEADLINE); + Local<Value> CENSUS_STATS_CONTEXT( + Nan::New<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)); + Nan::Set(propagate, Nan::New("CENSUS_STATS_CONTEXT").ToLocalChecked(), + CENSUS_STATS_CONTEXT); + Local<Value> CENSUS_TRACING_CONTEXT( + Nan::New<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT)); + Nan::Set(propagate, Nan::New("CENSUS_TRACING_CONTEXT").ToLocalChecked(), + CENSUS_TRACING_CONTEXT); + Local<Value> CANCELLATION( + Nan::New<Uint32, uint32_t>(GRPC_PROPAGATE_CANCELLATION)); + Nan::Set(propagate, Nan::New("CANCELLATION").ToLocalChecked(), CANCELLATION); + Local<Value> DEFAULTS(Nan::New<Uint32, uint32_t>(GRPC_PROPAGATE_DEFAULTS)); + Nan::Set(propagate, Nan::New("DEFAULTS").ToLocalChecked(), DEFAULTS); } -void InitConnectivityStateConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> channel_state = NanNew<Object>(); - exports->Set(NanNew("connectivityState"), channel_state); - Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE)); - channel_state->Set(NanNew("IDLE"), IDLE); - Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING)); - channel_state->Set(NanNew("CONNECTING"), CONNECTING); - Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY)); - channel_state->Set(NanNew("READY"), READY); - Handle<Value> TRANSIENT_FAILURE( - NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE)); - channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE); - Handle<Value> FATAL_FAILURE( - NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE)); - channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); +void InitConnectivityStateConstants(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> channel_state = Nan::New<Object>(); + Nan::Set(exports, Nan::New("connectivityState").ToLocalChecked(), + channel_state); + Local<Value> IDLE(Nan::New<Uint32, uint32_t>(GRPC_CHANNEL_IDLE)); + Nan::Set(channel_state, Nan::New("IDLE").ToLocalChecked(), IDLE); + Local<Value> CONNECTING(Nan::New<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING)); + Nan::Set(channel_state, Nan::New("CONNECTING").ToLocalChecked(), CONNECTING); + Local<Value> READY(Nan::New<Uint32, uint32_t>(GRPC_CHANNEL_READY)); + Nan::Set(channel_state, Nan::New("READY").ToLocalChecked(), READY); + Local<Value> TRANSIENT_FAILURE( + Nan::New<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE)); + Nan::Set(channel_state, Nan::New("TRANSIENT_FAILURE").ToLocalChecked(), + TRANSIENT_FAILURE); + Local<Value> FATAL_FAILURE( + Nan::New<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE)); + Nan::Set(channel_state, Nan::New("FATAL_FAILURE").ToLocalChecked(), + FATAL_FAILURE); } -void InitWriteFlags(Handle<Object> exports) { - NanScope(); - Handle<Object> write_flags = NanNew<Object>(); - exports->Set(NanNew("writeFlags"), write_flags); - Handle<Value> BUFFER_HINT(NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT)); - write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT); - Handle<Value> NO_COMPRESS(NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS)); - write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS); +void InitWriteFlags(Local<Object> exports) { + Nan::HandleScope scope; + Local<Object> write_flags = Nan::New<Object>(); + Nan::Set(exports, Nan::New("writeFlags").ToLocalChecked(), write_flags); + Local<Value> BUFFER_HINT(Nan::New<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT)); + Nan::Set(write_flags, Nan::New("BUFFER_HINT").ToLocalChecked(), BUFFER_HINT); + Local<Value> NO_COMPRESS(Nan::New<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS)); + Nan::Set(write_flags, Nan::New("NO_COMPRESS").ToLocalChecked(), NO_COMPRESS); } -void init(Handle<Object> exports) { - NanScope(); +void init(Local<Object> exports) { + Nan::HandleScope scope; grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 32a8ff55b1..87363fc446 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -50,6 +50,15 @@ namespace grpc { namespace node { +using Nan::Callback; +using Nan::EscapableHandleScope; +using Nan::HandleScope; +using Nan::Maybe; +using Nan::MaybeLocal; +using Nan::ObjectWrap; +using Nan::Persistent; +using Nan::Utf8String; + using std::unique_ptr; using v8::Array; using v8::Boolean; @@ -57,16 +66,13 @@ using v8::Date; using v8::Exception; using v8::Function; using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; using v8::Local; using v8::Number; using v8::Object; -using v8::Persistent; using v8::String; using v8::Value; -NanCallback *Server::constructor; +Nan::Callback *Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; class NewCallOp : public Op { @@ -82,22 +88,26 @@ class NewCallOp : public Op { grpc_metadata_array_destroy(&request_metadata); } - Handle<Value> GetNodeValue() const { - NanEscapableScope(); + Local<Value> GetNodeValue() const { + Nan::EscapableHandleScope scope; if (call == NULL) { - return NanEscapeScope(NanNull()); + return scope.Escape(Nan::Null()); } - Handle<Object> obj = NanNew<Object>(); - obj->Set(NanNew("call"), Call::WrapStruct(call)); - obj->Set(NanNew("method"), NanNew(details.method)); - obj->Set(NanNew("host"), NanNew(details.host)); - obj->Set(NanNew("deadline"), - NanNew<Date>(TimespecToMilliseconds(details.deadline))); - obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata)); - return NanEscapeScope(obj); + Local<Object> obj = Nan::New<Object>(); + Nan::Set(obj, Nan::New("call").ToLocalChecked(), Call::WrapStruct(call)); + Nan::Set(obj, Nan::New("method").ToLocalChecked(), + Nan::New(details.method).ToLocalChecked()); + Nan::Set(obj, Nan::New("host").ToLocalChecked(), + Nan::New(details.host).ToLocalChecked()); + Nan::Set(obj, Nan::New("deadline").ToLocalChecked(), + Nan::New<Date>( + TimespecToMilliseconds(details.deadline)).ToLocalChecked()); + Nan::Set(obj, Nan::New("metadata").ToLocalChecked(), + ParseMetadata(&request_metadata)); + return scope.Escape(obj); } - bool ParseOp(Handle<Value> value, grpc_op *out, + bool ParseOp(Local<Value> value, grpc_op *out, shared_ptr<Resources> resources) { return true; } @@ -124,35 +134,25 @@ Server::~Server() { grpc_completion_queue_destroy(this->shutdown_queue); } -void Server::Init(Handle<Object> exports) { - NanScope(); - Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New); - tpl->SetClassName(NanNew("Server")); +void Server::Init(Local<Object> exports) { + HandleScope scope; + Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); + tpl->SetClassName(Nan::New("Server").ToLocalChecked()); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "requestCall", - NanNew<FunctionTemplate>(RequestCall)->GetFunction()); - - NanSetPrototypeTemplate( - tpl, "addHttp2Port", - NanNew<FunctionTemplate>(AddHttp2Port)->GetFunction()); - - NanSetPrototypeTemplate(tpl, "start", - NanNew<FunctionTemplate>(Start)->GetFunction()); - - NanSetPrototypeTemplate(tpl, "tryShutdown", - NanNew<FunctionTemplate>(TryShutdown)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "forceShutdown", - NanNew<FunctionTemplate>(ForceShutdown)->GetFunction()); - - NanAssignPersistent(fun_tpl, tpl); - Handle<Function> ctr = tpl->GetFunction(); - constructor = new NanCallback(ctr); - exports->Set(NanNew("Server"), ctr); + Nan::SetPrototypeMethod(tpl, "requestCall", RequestCall); + Nan::SetPrototypeMethod(tpl, "addHttp2Port", AddHttp2Port); + Nan::SetPrototypeMethod(tpl, "start", Start); + Nan::SetPrototypeMethod(tpl, "tryShutdown", TryShutdown); + Nan::SetPrototypeMethod(tpl, "forceShutdown", ForceShutdown); + fun_tpl.Reset(tpl); + Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); + Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr); + constructor = new Callback(ctr); } -bool Server::HasInstance(Handle<Value> val) { - return NanHasInstance(fun_tpl, val); +bool Server::HasInstance(Local<Value> val) { + HandleScope scope; + return Nan::New(fun_tpl)->HasInstance(val); } void Server::ShutdownServer() { @@ -165,64 +165,77 @@ void Server::ShutdownServer() { } NAN_METHOD(Server::New) { - NanScope(); - /* If this is not a constructor call, make a constructor call and return the result */ - if (!args.IsConstructCall()) { + if (!info.IsConstructCall()) { const int argc = 1; - Local<Value> argv[argc] = {args[0]}; - NanReturnValue(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {info[0]}; + MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( + argc, argv); + if (maybe_instance.IsEmpty()) { + // There's probably a pending exception + return; + } else { + info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); + return; + } } grpc_server *wrapped_server; grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); - if (args[0]->IsUndefined()) { + if (info[0]->IsUndefined()) { wrapped_server = grpc_server_create(NULL, NULL); - } else if (args[0]->IsObject()) { - Handle<Object> args_hash(args[0]->ToObject()); - Handle<Array> keys(args_hash->GetOwnPropertyNames()); + } else if (info[0]->IsObject()) { + Local<Object> args_hash = Nan::To<Object>(info[0]).ToLocalChecked(); + Local<Array> keys = Nan::GetOwnPropertyNames(args_hash).ToLocalChecked(); grpc_channel_args channel_args; channel_args.num_args = keys->Length(); channel_args.args = reinterpret_cast<grpc_arg *>( calloc(channel_args.num_args, sizeof(grpc_arg))); /* These are used to keep all strings until then end of the block, then destroy them */ - std::vector<NanUtf8String *> key_strings(keys->Length()); - std::vector<NanUtf8String *> value_strings(keys->Length()); + std::vector<Utf8String *> key_strings(keys->Length()); + std::vector<Utf8String *> value_strings(keys->Length()); for (unsigned int i = 0; i < channel_args.num_args; i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - Handle<Value> current_value(args_hash->Get(current_key)); - key_strings[i] = new NanUtf8String(current_key); + MaybeLocal<String> maybe_key = Nan::To<String>( + Nan::Get(keys, i).ToLocalChecked()); + if (maybe_key.IsEmpty()) { + free(channel_args.args); + return Nan::ThrowTypeError("Arg keys must be strings"); + } + Local<String> current_key = maybe_key.ToLocalChecked(); + Local<Value> current_value = Nan::Get(args_hash, + current_key).ToLocalChecked(); + key_strings[i] = new Utf8String(current_key); channel_args.args[i].key = **key_strings[i]; if (current_value->IsInt32()) { channel_args.args[i].type = GRPC_ARG_INTEGER; - channel_args.args[i].value.integer = current_value->Int32Value(); + channel_args.args[i].value.integer = Nan::To<int32_t>( + current_value).FromJust(); } else if (current_value->IsString()) { channel_args.args[i].type = GRPC_ARG_STRING; - value_strings[i] = new NanUtf8String(current_value); + value_strings[i] = new Utf8String(current_value); channel_args.args[i].value.string = **value_strings[i]; } else { free(channel_args.args); - return NanThrowTypeError("Arg values must be strings"); + return Nan::ThrowTypeError("Arg values must be strings"); } } wrapped_server = grpc_server_create(&channel_args, NULL); free(channel_args.args); } else { - return NanThrowTypeError("Server expects an object"); + return Nan::ThrowTypeError("Server expects an object"); } grpc_server_register_completion_queue(wrapped_server, queue, NULL); Server *server = new Server(wrapped_server); - server->Wrap(args.This()); - NanReturnValue(args.This()); + server->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); } NAN_METHOD(Server::RequestCall) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("requestCall can only be called on a Server"); + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("requestCall can only be called on a Server"); } - Server *server = ObjectWrap::Unwrap<Server>(args.This()); + Server *server = ObjectWrap::Unwrap<Server>(info.This()); NewCallOp *op = new NewCallOp(); unique_ptr<OpVec> ops(new OpVec()); ops->push_back(unique_ptr<Op>(op)); @@ -230,79 +243,74 @@ NAN_METHOD(Server::RequestCall) { server->wrapped_server, &op->call, &op->details, &op->request_metadata, CompletionQueueAsyncWorker::GetQueue(), CompletionQueueAsyncWorker::GetQueue(), - new struct tag(new NanCallback(args[0].As<Function>()), ops.release(), + new struct tag(new Callback(info[0].As<Function>()), ops.release(), shared_ptr<Resources>(nullptr))); if (error != GRPC_CALL_OK) { - return NanThrowError(nanErrorWithCode("requestCall failed", error)); + return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); } CompletionQueueAsyncWorker::Next(); - NanReturnUndefined(); } NAN_METHOD(Server::AddHttp2Port) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError( "addHttp2Port can only be called on a Server"); } - if (!args[0]->IsString()) { - return NanThrowTypeError( + if (!info[0]->IsString()) { + return Nan::ThrowTypeError( "addHttp2Port's first argument must be a String"); } - if (!ServerCredentials::HasInstance(args[1])) { - return NanThrowTypeError( + if (!ServerCredentials::HasInstance(info[1])) { + return Nan::ThrowTypeError( "addHttp2Port's second argument must be ServerCredentials"); } - Server *server = ObjectWrap::Unwrap<Server>(args.This()); + Server *server = ObjectWrap::Unwrap<Server>(info.This()); ServerCredentials *creds_object = ObjectWrap::Unwrap<ServerCredentials>( - args[1]->ToObject()); + Nan::To<Object>(info[1]).ToLocalChecked()); grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials(); int port; if (creds == NULL) { port = grpc_server_add_insecure_http2_port(server->wrapped_server, - *NanUtf8String(args[0])); + *Utf8String(info[0])); } else { port = grpc_server_add_secure_http2_port(server->wrapped_server, - *NanUtf8String(args[0]), + *Utf8String(info[0]), creds); } - NanReturnValue(NanNew<Number>(port)); + info.GetReturnValue().Set(Nan::New<Number>(port)); } NAN_METHOD(Server::Start) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("start can only be called on a Server"); + Nan::HandleScope scope; + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("start can only be called on a Server"); } - Server *server = ObjectWrap::Unwrap<Server>(args.This()); + Server *server = ObjectWrap::Unwrap<Server>(info.This()); grpc_server_start(server->wrapped_server); - NanReturnUndefined(); } NAN_METHOD(Server::TryShutdown) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("tryShutdown can only be called on a Server"); + Nan::HandleScope scope; + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); } - Server *server = ObjectWrap::Unwrap<Server>(args.This()); + Server *server = ObjectWrap::Unwrap<Server>(info.This()); unique_ptr<OpVec> ops(new OpVec()); grpc_server_shutdown_and_notify( server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(), - new struct tag(new NanCallback(args[0].As<Function>()), ops.release(), + new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), shared_ptr<Resources>(nullptr))); CompletionQueueAsyncWorker::Next(); - NanReturnUndefined(); } NAN_METHOD(Server::ForceShutdown) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("forceShutdown can only be called on a Server"); + Nan::HandleScope scope; + if (!HasInstance(info.This())) { + return Nan::ThrowTypeError("forceShutdown can only be called on a Server"); } - Server *server = ObjectWrap::Unwrap<Server>(args.This()); + Server *server = ObjectWrap::Unwrap<Server>(info.This()); server->ShutdownServer(); - NanReturnUndefined(); } } // namespace node diff --git a/src/node/ext/server.h b/src/node/ext/server.h index e7d5c3fb11..ab5fc210e8 100644 --- a/src/node/ext/server.h +++ b/src/node/ext/server.h @@ -44,14 +44,14 @@ namespace node { /* Wraps grpc_server as a JavaScript object. Provides a constructor and wrapper methods for grpc_server_create, grpc_server_request_call, grpc_server_add_http2_port, and grpc_server_start. */ -class Server : public ::node::ObjectWrap { +class Server : public Nan::ObjectWrap { public: /* Initializes the Server class and exposes the constructor and wrapper methods to JavaScript */ - static void Init(v8::Handle<v8::Object> exports); + static void Init(v8::Local<v8::Object> exports); /* Tests whether the given value was constructed by this class's JavaScript constructor */ - static bool HasInstance(v8::Handle<v8::Value> val); + static bool HasInstance(v8::Local<v8::Value> val); private: explicit Server(grpc_server *server); @@ -69,8 +69,8 @@ class Server : public ::node::ObjectWrap { static NAN_METHOD(Start); static NAN_METHOD(TryShutdown); static NAN_METHOD(ForceShutdown); - static NanCallback *constructor; - static v8::Persistent<v8::FunctionTemplate> fun_tpl; + static Nan::Callback *constructor; + static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_server *wrapped_server; grpc_completion_queue *shutdown_queue; diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc index b1201eb664..5e922bd877 100644 --- a/src/node/ext/server_credentials.cc +++ b/src/node/ext/server_credentials.cc @@ -41,22 +41,28 @@ namespace grpc { namespace node { +using Nan::Callback; +using Nan::EscapableHandleScope; +using Nan::HandleScope; +using Nan::Maybe; +using Nan::MaybeLocal; +using Nan::ObjectWrap; +using Nan::Persistent; +using Nan::Utf8String; + using v8::Array; using v8::Exception; using v8::External; using v8::Function; using v8::FunctionTemplate; -using v8::Handle; -using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Object; using v8::ObjectTemplate; -using v8::Persistent; using v8::String; using v8::Value; -NanCallback *ServerCredentials::constructor; +Nan::Callback *ServerCredentials::constructor; Persistent<FunctionTemplate> ServerCredentials::fun_tpl; ServerCredentials::ServerCredentials(grpc_server_credentials *credentials) @@ -66,33 +72,41 @@ ServerCredentials::~ServerCredentials() { grpc_server_credentials_release(wrapped_credentials); } -void ServerCredentials::Init(Handle<Object> exports) { - NanScope(); - Local<FunctionTemplate> tpl = NanNew<FunctionTemplate>(New); - tpl->SetClassName(NanNew("ServerCredentials")); +void ServerCredentials::Init(Local<Object> exports) { + Nan::HandleScope scope; + Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); + tpl->SetClassName(Nan::New("ServerCredentials").ToLocalChecked()); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanAssignPersistent(fun_tpl, tpl); - Handle<Function> ctr = tpl->GetFunction(); - ctr->Set(NanNew("createSsl"), - NanNew<FunctionTemplate>(CreateSsl)->GetFunction()); - ctr->Set(NanNew("createInsecure"), - NanNew<FunctionTemplate>(CreateInsecure)->GetFunction()); - constructor = new NanCallback(ctr); - exports->Set(NanNew("ServerCredentials"), ctr); + Local<Function> ctr = tpl->GetFunction(); + Nan::Set(ctr, Nan::New("createSsl").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateSsl)).ToLocalChecked()); + Nan::Set(ctr, Nan::New("createInsecure").ToLocalChecked(), + Nan::GetFunction( + Nan::New<FunctionTemplate>(CreateInsecure)).ToLocalChecked()); + fun_tpl.Reset(tpl); + constructor = new Nan::Callback(ctr); + Nan::Set(exports, Nan::New("ServerCredentials").ToLocalChecked(), ctr); } -bool ServerCredentials::HasInstance(Handle<Value> val) { - NanScope(); - return NanHasInstance(fun_tpl, val); +bool ServerCredentials::HasInstance(Local<Value> val) { + Nan::HandleScope scope; + return Nan::New(fun_tpl)->HasInstance(val); } -Handle<Value> ServerCredentials::WrapStruct( +Local<Value> ServerCredentials::WrapStruct( grpc_server_credentials *credentials) { - NanEscapableScope(); + Nan::EscapableHandleScope scope; const int argc = 1; - Handle<Value> argv[argc] = { - NanNew<External>(reinterpret_cast<void *>(credentials))}; - return NanEscapeScope(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = { + Nan::New<External>(reinterpret_cast<void *>(credentials))}; + MaybeLocal<Object> maybe_instance = Nan::NewInstance( + constructor->GetFunction(), argc, argv); + if (maybe_instance.IsEmpty()) { + return scope.Escape(Nan::Null()); + } else { + return scope.Escape(maybe_instance.ToLocalChecked()); + } } grpc_server_credentials *ServerCredentials::GetWrappedServerCredentials() { @@ -100,96 +114,103 @@ grpc_server_credentials *ServerCredentials::GetWrappedServerCredentials() { } NAN_METHOD(ServerCredentials::New) { - NanScope(); - - if (args.IsConstructCall()) { - if (!args[0]->IsExternal()) { - return NanThrowTypeError( + if (info.IsConstructCall()) { + if (!info[0]->IsExternal()) { + return Nan::ThrowTypeError( "ServerCredentials can only be created with the provide functions"); } - Handle<External> ext = args[0].As<External>(); + Local<External> ext = info[0].As<External>(); grpc_server_credentials *creds_value = reinterpret_cast<grpc_server_credentials *>(ext->Value()); ServerCredentials *credentials = new ServerCredentials(creds_value); - credentials->Wrap(args.This()); - NanReturnValue(args.This()); + credentials->Wrap(info.This()); + info.GetReturnValue().Set(info.This()); } else { const int argc = 1; - Local<Value> argv[argc] = {args[0]}; - NanReturnValue(constructor->GetFunction()->NewInstance(argc, argv)); + Local<Value> argv[argc] = {info[0]}; + MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance( + argc, argv); + if (maybe_instance.IsEmpty()) { + // There's probably a pending exception + return; + } else { + info.GetReturnValue().Set(maybe_instance.ToLocalChecked()); + } } } NAN_METHOD(ServerCredentials::CreateSsl) { - // TODO: have the node API support multiple key/cert pairs. - NanScope(); + Nan::HandleScope scope; char *root_certs = NULL; - if (::node::Buffer::HasInstance(args[0])) { - root_certs = ::node::Buffer::Data(args[0]); - } else if (!(args[0]->IsNull() || args[0]->IsUndefined())) { - return NanThrowTypeError( + if (::node::Buffer::HasInstance(info[0])) { + root_certs = ::node::Buffer::Data(info[0]); + } else if (!(info[0]->IsNull() || info[0]->IsUndefined())) { + return Nan::ThrowTypeError( "createSSl's first argument must be a Buffer if provided"); } - if (!args[1]->IsArray()) { - return NanThrowTypeError( + if (!info[1]->IsArray()) { + return Nan::ThrowTypeError( "createSsl's second argument must be a list of objects"); } int force_client_auth = 0; - if (args[2]->IsBoolean()) { - force_client_auth = (int)args[2]->BooleanValue(); - } else if (!(args[2]->IsUndefined() || args[2]->IsNull())) { - return NanThrowTypeError( + if (info[2]->IsBoolean()) { + force_client_auth = (int)Nan::To<bool>(info[2]).FromJust(); + } else if (!(info[2]->IsUndefined() || info[2]->IsNull())) { + return Nan::ThrowTypeError( "createSsl's third argument must be a boolean if provided"); } - Handle<Array> pair_list = Local<Array>::Cast(args[1]); + Local<Array> pair_list = Local<Array>::Cast(info[1]); uint32_t key_cert_pair_count = pair_list->Length(); grpc_ssl_pem_key_cert_pair *key_cert_pairs = new grpc_ssl_pem_key_cert_pair[ key_cert_pair_count]; - Handle<String> key_key = NanNew("private_key"); - Handle<String> cert_key = NanNew("cert_chain"); + Local<String> key_key = Nan::New("private_key").ToLocalChecked(); + Local<String> cert_key = Nan::New("cert_chain").ToLocalChecked(); for(uint32_t i = 0; i < key_cert_pair_count; i++) { - if (!pair_list->Get(i)->IsObject()) { + Local<Value> pair_val = Nan::Get(pair_list, i).ToLocalChecked(); + if (!pair_val->IsObject()) { delete key_cert_pairs; - return NanThrowTypeError("Key/cert pairs must be objects"); + return Nan::ThrowTypeError("Key/cert pairs must be objects"); } - Handle<Object> pair_obj = pair_list->Get(i)->ToObject(); - if (!pair_obj->HasOwnProperty(key_key)) { + Local<Object> pair_obj = Nan::To<Object>(pair_val).ToLocalChecked(); + MaybeLocal<Value> maybe_key = Nan::Get(pair_obj, key_key); + if (maybe_key.IsEmpty()) { delete key_cert_pairs; - return NanThrowTypeError( + return Nan::ThrowTypeError( "Key/cert pairs must have a private_key and a cert_chain"); } - if (!pair_obj->HasOwnProperty(cert_key)) { + MaybeLocal<Value> maybe_cert = Nan::Get(pair_obj, cert_key); + if (maybe_cert.IsEmpty()) { delete key_cert_pairs; - return NanThrowTypeError( + return Nan::ThrowTypeError( "Key/cert pairs must have a private_key and a cert_chain"); } - if (!::node::Buffer::HasInstance(pair_obj->Get(key_key))) { + if (!::node::Buffer::HasInstance(maybe_key.ToLocalChecked())) { delete key_cert_pairs; - return NanThrowTypeError("private_key must be a Buffer"); + return Nan::ThrowTypeError("private_key must be a Buffer"); } - if (!::node::Buffer::HasInstance(pair_obj->Get(cert_key))) { + if (!::node::Buffer::HasInstance(maybe_cert.ToLocalChecked())) { delete key_cert_pairs; - return NanThrowTypeError("cert_chain must be a Buffer"); + return Nan::ThrowTypeError("cert_chain must be a Buffer"); } key_cert_pairs[i].private_key = ::node::Buffer::Data( - pair_obj->Get(key_key)); + maybe_key.ToLocalChecked()); key_cert_pairs[i].cert_chain = ::node::Buffer::Data( - pair_obj->Get(cert_key)); + maybe_cert.ToLocalChecked()); } grpc_server_credentials *creds = grpc_ssl_server_credentials_create( root_certs, key_cert_pairs, key_cert_pair_count, force_client_auth, NULL); delete key_cert_pairs; if (creds == NULL) { - NanReturnNull(); + info.GetReturnValue().SetNull(); + } else { + info.GetReturnValue().Set(WrapStruct(creds)); } - NanReturnValue(WrapStruct(creds)); } NAN_METHOD(ServerCredentials::CreateInsecure) { - NanScope(); - NanReturnValue(WrapStruct(NULL)); + info.GetReturnValue().Set(WrapStruct(NULL)); } } // namespace node diff --git a/src/node/ext/server_credentials.h b/src/node/ext/server_credentials.h index 63903f663c..bf279e481c 100644 --- a/src/node/ext/server_credentials.h +++ b/src/node/ext/server_credentials.h @@ -43,12 +43,12 @@ namespace grpc { namespace node { /* Wrapper class for grpc_server_credentials structs */ -class ServerCredentials : public ::node::ObjectWrap { +class ServerCredentials : public Nan::ObjectWrap { public: - static void Init(v8::Handle<v8::Object> exports); - static bool HasInstance(v8::Handle<v8::Value> val); + static void Init(v8::Local<v8::Object> exports); + static bool HasInstance(v8::Local<v8::Value> val); /* Wrap a grpc_server_credentials struct in a javascript object */ - static v8::Handle<v8::Value> WrapStruct(grpc_server_credentials *credentials); + static v8::Local<v8::Value> WrapStruct(grpc_server_credentials *credentials); /* Returns the grpc_server_credentials struct that this object wraps */ grpc_server_credentials *GetWrappedServerCredentials(); @@ -64,9 +64,9 @@ class ServerCredentials : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(CreateSsl); static NAN_METHOD(CreateInsecure); - static NanCallback *constructor; + static Nan::Callback *constructor; // Used for typechecking instances of this javascript class - static v8::Persistent<v8::FunctionTemplate> fun_tpl; + static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_server_credentials *wrapped_credentials; }; diff --git a/src/node/interop/test.proto b/src/node/interop/test.proto index d2c3f9befe..24e67497fa 100644 --- a/src/node/interop/test.proto +++ b/src/node/interop/test.proto @@ -45,7 +45,6 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/node/package.json b/src/node/package.json index bb8987cc0d..22f94757ce 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.11.0", + "version": "0.11.1", "author": "Google Inc.", "description": "gRPC Library for Node", "homepage": "http://www.grpc.io/", @@ -27,7 +27,7 @@ "dependencies": { "bindings": "^1.2.0", "lodash": "^3.9.3", - "nan": "^1.5.0", + "nan": "^2.0.0", "protobufjs": "^4.0.0" }, "devDependencies": { diff --git a/src/node/src/server.js b/src/node/src/server.js index b6f162adf8..70b4a9d80e 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -276,6 +276,7 @@ function ServerWritableStream(call, serialize) { function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; + var self = this; if (!this.call.metadataSent) { batch[grpc.opType.SEND_INITIAL_METADATA] = (new Metadata())._getCoreRepresentation(); @@ -290,7 +291,7 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, value) { if (err) { - this.emit('error', err); + self.emit('error', err); return; } callback(); @@ -305,6 +306,7 @@ ServerWritableStream.prototype._write = _write; */ function sendMetadata(responseMetadata) { /* jshint validthis: true */ + var self = this; if (!this.call.metadataSent) { this.call.metadataSent = true; var batch = []; @@ -312,7 +314,7 @@ function sendMetadata(responseMetadata) { responseMetadata._getCoreRepresentation(); this.call.startBatch(batch, function(err) { if (err) { - this.emit('error', err); + self.emit('error', err); return; } }); diff --git a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h index 1391b5725f..cf5a1be9d6 100644 --- a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h +++ b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h @@ -34,7 +34,7 @@ #import <Foundation/Foundation.h> #include <grpc/grpc.h> -#import "GRPCCall.h" +#import "../GRPCCall.h" @interface GRPCRequestHeaders : NSObject<GRPCRequestHeaders> diff --git a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m index 761677ce50..d23f21c0f9 100644 --- a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m +++ b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m @@ -35,7 +35,6 @@ #import <Foundation/Foundation.h> -#import "../GRPCCall.h" #import "NSDictionary+GRPC.h" // Used by the setter. diff --git a/src/objective-c/examples/RemoteTestClient/test.proto b/src/objective-c/examples/RemoteTestClient/test.proto index 2f5a5489b3..514c3b8095 100644 --- a/src/objective-c/examples/RemoteTestClient/test.proto +++ b/src/objective-c/examples/RemoteTestClient/test.proto @@ -45,7 +45,6 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/objective-c/generated_libraries/RemoteTestClient/test.proto b/src/objective-c/generated_libraries/RemoteTestClient/test.proto index 2f5a5489b3..514c3b8095 100644 --- a/src/objective-c/generated_libraries/RemoteTestClient/test.proto +++ b/src/objective-c/generated_libraries/RemoteTestClient/test.proto @@ -45,7 +45,6 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/php/README.md b/src/php/README.md index 51322c7526..a054258782 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -5,7 +5,7 @@ This directory contains source code for PHP implementation of gRPC layered on sh #Status -Alpha : Ready for early adopters +Beta ## Environment @@ -49,7 +49,7 @@ sudo apt-get install libgrpc-dev Install the gRPC PHP extension ```sh -sudo pecl install grpc-alpha +sudo pecl install grpc-beta ``` **Mac OS X:** @@ -96,7 +96,7 @@ $ sudo make install # 'make' should have been run by core grpc Install the gRPC PHP extension ```sh -$ sudo pecl install grpc-alpha +$ sudo pecl install grpc-beta ``` OR @@ -109,15 +109,19 @@ $ make $ sudo make install ``` -In your php.ini file, add the line `extension=grpc.so` to load the extension -at PHP startup. +Add this line to your `php.ini` file, e.g. `/etc/php5/cli/php.ini` + +```sh +extension=grpc.so +``` Install Composer ```sh $ cd grpc/src/php $ curl -sS https://getcomposer.org/installer | php -$ php composer.phar install +$ sudo mv composer.phar /usr/local/bin/composer +$ composer install ``` ## Unit Tests @@ -164,6 +168,132 @@ $ cd grpc/src/php $ ./bin/run_gen_code_test.sh ``` +## Use the gRPC PHP extension with Apache + +Install `apache2`, in addition to `php5` above + +```sh +$ sudo apt-get install apache2 +``` + +Add this line to your `php.ini` file, e.g. `/etc/php5/apache2/php.ini` + +```sh +extension=grpc.so +``` + +Restart apache + +```sh +$ sudo service apache2 restart +``` + +Make sure the Node math server is still running, as above. + +```sh +$ cd grpc/src/node +$ nodejs examples/math_server.js +``` + +Make sure you have run `composer install` to generate the `vendor/autoload.php` file + +```sh +$ composer install +``` + +Make sure you have generated the client stub `math.php` + +```sh +$ ./bin/generate_proto_php.sh +``` + +Copy the `math_client.php` file into your Apache document root, e.g. + +```sh +$ cp tests/generated_code/math_client.php /var/www/html +``` + +You may have to fix the first two lines to point the includes to your installation: + +```php +include 'vendor/autoload.php'; +include 'tests/generated_code/math.php'; +``` + +Connect to `localhost/math_client.php` in your browser, or run this from command line: + +```sh +$ curl localhost/math_client.php +``` + +## Use the gRPC PHP extension with Nginx/PHP-FPM + +Install `nginx` and `php5-fpm`, in addition to `php5` above + +```sh +$ sudo apt-get install nginx php5-fpm +``` + +Add this line to your `php.ini` file, e.g. `/etc/php5/fpm/php.ini` + +```sh +extension=grpc.so +``` + +Uncomment the following lines in your `/etc/nginx/sites-available/default` file: + +``` +location ~ \.php$ { + include snippets/fastcgi-php.conf; + fastcgi_pass unix:/var/run/php5-fpm.sock; +} +``` + +Restart nginx and php-fpm + +```sh +$ sudo service nginx restart +$ sudo service php5-fpm restart +``` + +Make sure the Node math server is still running, as above. + +```sh +$ cd grpc/src/node +$ nodejs examples/math_server.js +``` + +Make sure you have run `composer install` to generate the `vendor/autoload.php` file + +```sh +$ composer install +``` + +Make sure you have generated the client stub `math.php` + +```sh +$ ./bin/generate_proto_php.sh +``` + +Copy the `math_client.php` file into your Nginx document root, e.g. + +```sh +$ cp tests/generated_code/math_client.php /var/www/html +``` + +You may have to fix the first two lines to point the includes to your installation: + +```php +include 'vendor/autoload.php'; +include 'tests/generated_code/math.php'; +``` + +Connect to `localhost/math_client.php` in your browser, or run this from command line: + +```sh +$ curl localhost/math_client.php +``` + [homebrew]:http://brew.sh [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Node]:https://github.com/grpc/grpc/tree/master/src/node/examples diff --git a/src/php/bin/run_tests.sh b/src/php/bin/run_tests.sh index 953f408ea8..1fe68cdab4 100755 --- a/src/php/bin/run_tests.sh +++ b/src/php/bin/run_tests.sh @@ -31,7 +31,11 @@ # Loads the local shared library, and runs all of the test cases in tests/ # against it set -e -cd $(dirname $0) +cd $(dirname $0)/../../.. +root=$(pwd) +cd src/php/bin source ./determine_extension_dir.sh +# in some jenkins macos machine, somehow the PHP build script can't find libgrpc.dylib +export DYLD_LIBRARY_PATH=$root/libs/$config php $extension_dir $(which phpunit) -v --debug --strict \ ../tests/unit_tests diff --git a/src/php/tests/generated_code/math_client.php b/src/php/tests/generated_code/math_client.php new file mode 100644 index 0000000000..7bc78287be --- /dev/null +++ b/src/php/tests/generated_code/math_client.php @@ -0,0 +1,102 @@ +<?php +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +# Fix the following two lines to point to your installation +include 'vendor/autoload.php'; +include 'tests/generated_code/math.php'; + +function p($line) { + print("$line<br/>\n"); +} + +$host = "localhost:50051"; +p("Connecting to host: $host"); +$client = new math\MathClient($host, []); +p("Client class: ".get_class($client)); +p(''); + +p("Running unary call test:"); +$dividend = 7; +$divisor = 4; +$div_arg = new math\DivArgs(); +$div_arg->setDividend($dividend); +$div_arg->setDivisor($divisor); +$call = $client->Div($div_arg); +p("Call peer: ".$call->getPeer()); +p("Dividing $dividend by $divisor"); +list($response, $status) = $call->wait(); +p("quotient = ".$response->getQuotient()); +p("remainder = ".$response->getRemainder()); +p(''); + +p("Running server streaming test:"); +$limit = 7; +$fib_arg = new math\FibArgs(); +$fib_arg->setLimit($limit); +$call = $client->Fib($fib_arg); +$result_array = iterator_to_array($call->responses()); +$result = ''; +foreach ($result_array as $num) { + $result .= ' '.$num->getNum(); +} +p("The first $limit Fibonacci numbers are:".$result); +p(''); + +p("Running client streaming test:"); +$call = $client->Sum(); +for ($i = 0; $i <= $limit; $i++) { + $num = new math\Num(); + $num->setNum($i); + $call->write($num); +} +list($response, $status) = $call->wait(); +p(sprintf("The first %d positive integers sum to: %d", + $limit, $response->getNum())); +p(''); + +p("Running bidi-streaming test:"); +$call = $client->DivMany(); +for ($i = 0; $i < 7; $i++) { + $div_arg = new math\DivArgs(); + $dividend = 2 * $i + 1; + $divisor = 3; + $div_arg->setDividend($dividend); + $div_arg->setDivisor($divisor); + $call->write($div_arg); + p("client writing: $dividend / $divisor"); + $response = $call->read(); + p(sprintf("server writing: quotient = %d, remainder = %d", + $response->getQuotient(), $response->getRemainder())); +} +$call->writesDone(); diff --git a/src/php/tests/interop/test.proto b/src/php/tests/interop/test.proto index 39c08f3544..0d169e7f64 100644 --- a/src/php/tests/interop/test.proto +++ b/src/php/tests/interop/test.proto @@ -44,7 +44,6 @@ service TestService { rpc EmptyCall(grpc.testing.EmptyMessage) returns (grpc.testing.EmptyMessage); // One request followed by one response. - // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index bd464f939f..b65366233a 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -204,11 +204,11 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ } public function testWatchConnectivityStateFailed() { - $idle_state = $this->channel->getConnectivityState(true); + $idle_state = $this->channel->getConnectivityState(); $this->assertTrue($idle_state == Grpc\CHANNEL_IDLE); $now = Grpc\Timeval::now(); - $delta = new Grpc\Timeval(1); + $delta = new Grpc\Timeval(500000); // should timeout $deadline = $now->add($delta); $this->assertFalse($this->channel->watchConnectivityState( diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py index 9b461fb3dd..c9d64ad35a 100644 --- a/src/python/grpcio/grpc/beta/implementations.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -147,7 +147,7 @@ def secure_channel(host, port, client_credentials): A secure Channel to the remote host through which RPCs may be conducted. """ intermediary_low_channel = _intermediary_low.Channel( - '%s:%d' % (host, port), client_credentials.intermediary_low_credentials) + '%s:%d' % (host, port), client_credentials._intermediary_low_credentials) return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access diff --git a/src/python/grpcio/requirements.txt b/src/python/grpcio/requirements.txt index 608ba402e0..77356e0a74 100644 --- a/src/python/grpcio/requirements.txt +++ b/src/python/grpcio/requirements.txt @@ -1,2 +1,2 @@ -enum34==1.0.4 -futures==2.2.0 +enum34>=1.0.4 +futures>=2.2.0 diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py index 151b2bfcb4..8b87c09d5c 100644 --- a/src/python/grpcio/setup.py +++ b/src/python/grpcio/setup.py @@ -87,9 +87,8 @@ _PACKAGE_DIRECTORIES = { } _INSTALL_REQUIRES = ( - 'enum34==1.0.4', - 'futures==2.2.0', - 'protobuf==3.0.0a3', + 'enum34>=1.0.4', + 'futures>=2.2.0', ) _SETUP_REQUIRES = ( @@ -104,7 +103,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio', - version='0.11.0b0', + version='0.11.0b1', ext_modules=_EXTENSION_MODULES, packages=list(_PACKAGES), package_dir=_PACKAGE_DIRECTORIES, diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index 272a37f15f..3032736975 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -72,6 +72,36 @@ class _PauseableIterator(object): return next(self._upstream) +class _Callback(object): + + def __init__(self): + self._condition = threading.Condition() + self._called = False + self._passed_future = None + self._passed_other_stuff = None + + def __call__(self, *args, **kwargs): + with self._condition: + self._called = True + if args: + self._passed_future = args[0] + if 1 < len(args) or kwargs: + self._passed_other_stuff = tuple(args[1:]), dict(kwargs) + self._condition.notify_all() + + def future(self): + with self._condition: + while True: + if self._passed_other_stuff is not None: + raise ValueError( + 'Test callback passed unexpected values: %s', + self._passed_other_stuff) + elif self._called: + return self._passed_future + else: + self._condition.wait() + + class TestCase(test_coverage.Coverage, unittest.TestCase): """A test of the Face layer of RPC Framework. @@ -112,12 +142,15 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: request = test_messages.request() + callback = _Callback() response_future = self._invoker.future(group, method)( request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) response = response_future.result() test_messages.verify(request, response, self) + self.assertIs(callback.future(), response_future) def testSuccessfulUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( @@ -137,15 +170,19 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) + callback = _Callback() # Use of a paused iterator of requests allows us to test that control is # returned to calling code before the iterator yields any requests. with request_iterator.pause(): response_future = self._invoker.future(group, method)( request_iterator, test_constants.LONG_TIMEOUT) - response = response_future.result() + response_future.add_done_callback(callback) + future_passed_to_callback = callback.future() + response = future_passed_to_callback.result() test_messages.verify(requests, response, self) + self.assertIs(future_passed_to_callback, response_future) def testSuccessfulStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( @@ -208,12 +245,15 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: request = test_messages.request() + callback = _Callback() with self._control.pause(): response_future = self._invoker.future(group, method)( request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) cancel_method_return_value = response_future.cancel() + self.assertIs(callback.future(), response_future) self.assertFalse(cancel_method_return_value) self.assertTrue(response_future.cancelled()) @@ -236,12 +276,15 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.stream_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: requests = test_messages.requests() + callback = _Callback() with self._control.pause(): response_future = self._invoker.future(group, method)( iter(requests), test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) cancel_method_return_value = response_future.cancel() + self.assertIs(callback.future(), response_future) self.assertFalse(cancel_method_return_value) self.assertTrue(response_future.cancelled()) @@ -264,10 +307,13 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: request = test_messages.request() + callback = _Callback() with self._control.pause(): response_future = self._invoker.future( group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) self.assertIsInstance( response_future.exception(), face.ExpirationError) with self.assertRaises(face.ExpirationError): @@ -290,10 +336,13 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.stream_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: requests = test_messages.requests() + callback = _Callback() with self._control.pause(): response_future = self._invoker.future(group, method)( iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) self.assertIsInstance( response_future.exception(), face.ExpirationError) with self.assertRaises(face.ExpirationError): @@ -316,11 +365,14 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: request = test_messages.request() + callback = _Callback() with self._control.fail(): response_future = self._invoker.future(group, method)( request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is # indistinguishable from simply not having called its @@ -350,11 +402,14 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): self._digest.stream_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: requests = test_messages.requests() + callback = _Callback() with self._control.fail(): response_future = self._invoker.future(group, method)( iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) # Because the servicer fails outside of the thread from which the # servicer-side runtime called into it its failure is # indistinguishable from simply not having called its diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 216119f0e7..fe36bc9232 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -72,6 +72,8 @@ _SETUP_REQUIRES = ( _INSTALL_REQUIRES = ( 'oauth2client>=1.4.7', 'grpcio>=0.11.0b0', + # TODO(issue 3321): Unpin protobuf dependency. + 'protobuf==3.0.0a3', ) _COMMAND_CLASS = { |