diff options
author | ncteisen <ncteisen@gmail.com> | 2017-08-01 10:58:29 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2017-08-01 10:58:29 -0700 |
commit | ab04027e474da7212109e4afbd9896d3bb78719c (patch) | |
tree | 75a3f7a981bbd454f35089a0dde8346d01a7d77e /src | |
parent | fb193a1e2fd0093103101b2551dd3e7b9af8833d (diff) | |
parent | d885d24161e9ebf7c1f1d225569c949cd909fd06 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-v3
Diffstat (limited to 'src')
20 files changed, 312 insertions, 86 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index c3dca14305..b83c95275f 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); gpr_mu_init(&w->mu); GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 9065e33613..6ec3790a5f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -33,13 +33,13 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include <nameser.h> #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/nameser.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 29dfa885de..776e0765fe 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -968,6 +968,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); + if (url) { + gpr_free(url); + } unsigned int header_index; for (header_index = 0; header_index < s->header_array.count; header_index++) { diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index c72c37e2b5..9b66987b68 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -73,10 +73,8 @@ static const grpc_closure_scheduler_vtable finally_scheduler = { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); grpc_combiner *grpc_combiner_create(void) { - grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + grpc_combiner *lock = gpr_zalloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); - lock->next_combiner_on_this_exec_ctx = NULL; - lock->time_to_execute_final_list = false; lock->scheduler.vtable = &scheduler; lock->finally_scheduler.vtable = &finally_scheduler; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h new file mode 100644 index 0000000000..daed6de518 --- /dev/null +++ b/src/core/lib/iomgr/nameser.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H +#define GRPC_CORE_LIB_IOMGR_NAMESER_H + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_ARPA_NAMESER + +#include <arpa/nameser.h> + +#else /* GRPC_HAVE_ARPA_NAMESER */ + +typedef enum __ns_class { + ns_c_invalid = 0, /* Cookie. */ + ns_c_in = 1, /* Internet. */ + ns_c_2 = 2, /* unallocated/unsupported. */ + ns_c_chaos = 3, /* MIT Chaos-net. */ + ns_c_hs = 4, /* MIT Hesiod. */ + /* Query class values which do not appear in resource records */ + ns_c_none = 254, /* for prereq. sections in update requests */ + ns_c_any = 255, /* Wildcard match. */ + ns_c_max = 65536 +} ns_class; + +typedef enum __ns_type { + ns_t_invalid = 0, /* Cookie. */ + ns_t_a = 1, /* Host address. */ + ns_t_ns = 2, /* Authoritative server. */ + ns_t_md = 3, /* Mail destination. */ + ns_t_mf = 4, /* Mail forwarder. */ + ns_t_cname = 5, /* Canonical name. */ + ns_t_soa = 6, /* Start of authority zone. */ + ns_t_mb = 7, /* Mailbox domain name. */ + ns_t_mg = 8, /* Mail group member. */ + ns_t_mr = 9, /* Mail rename name. */ + ns_t_null = 10, /* Null resource record. */ + ns_t_wks = 11, /* Well known service. */ + ns_t_ptr = 12, /* Domain name pointer. */ + ns_t_hinfo = 13, /* Host information. */ + ns_t_minfo = 14, /* Mailbox information. */ + ns_t_mx = 15, /* Mail routing information. */ + ns_t_txt = 16, /* Text strings. */ + ns_t_rp = 17, /* Responsible person. */ + ns_t_afsdb = 18, /* AFS cell database. */ + ns_t_x25 = 19, /* X_25 calling address. */ + ns_t_isdn = 20, /* ISDN calling address. */ + ns_t_rt = 21, /* Router. */ + ns_t_nsap = 22, /* NSAP address. */ + ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */ + ns_t_sig = 24, /* Security signature. */ + ns_t_key = 25, /* Security key. */ + ns_t_px = 26, /* X.400 mail mapping. */ + ns_t_gpos = 27, /* Geographical position (withdrawn). */ + ns_t_aaaa = 28, /* Ip6 Address. */ + ns_t_loc = 29, /* Location Information. */ + ns_t_nxt = 30, /* Next domain (security). */ + ns_t_eid = 31, /* Endpoint identifier. */ + ns_t_nimloc = 32, /* Nimrod Locator. */ + ns_t_srv = 33, /* Server Selection. */ + ns_t_atma = 34, /* ATM Address */ + ns_t_naptr = 35, /* Naming Authority PoinTeR */ + ns_t_kx = 36, /* Key Exchange */ + ns_t_cert = 37, /* Certification record */ + ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */ + ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */ + ns_t_sink = 40, /* Kitchen sink (experimentatl) */ + ns_t_opt = 41, /* EDNS0 option (meta-RR) */ + ns_t_apl = 42, /* Address prefix list (RFC3123) */ + ns_t_ds = 43, /* Delegation Signer (RFC4034) */ + ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */ + ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */ + ns_t_nsec = 47, /* Next Secure (RFC4034) */ + ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */ + ns_t_tkey = 249, /* Transaction key */ + ns_t_tsig = 250, /* Transaction signature. */ + ns_t_ixfr = 251, /* Incremental zone transfer. */ + ns_t_axfr = 252, /* Transfer zone of authority. */ + ns_t_mailb = 253, /* Transfer mailbox records. */ + ns_t_maila = 254, /* Transfer mail agent records. */ + ns_t_any = 255, /* Wildcard match. */ + ns_t_zxfr = 256, /* BIND-specific, nonstandard. */ + ns_t_max = 65536 +} ns_type; + +#endif /* GRPC_HAVE_ARPA_NAMESER */ + +#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f5d15b4850..c12058f890 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -24,6 +24,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -51,6 +52,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 @@ -82,6 +84,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 @@ -93,6 +96,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 @@ -104,6 +108,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_NACL) +#define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index ef8405cca8..55934964f3 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -18,6 +18,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/completion_queue.h" @@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2365d27307..00ec9c7c9a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -644,6 +644,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_error *error_from_status(grpc_status_code status, const char *description) { + // copying 'description' is needed to ensure the grpc_call_cancel_with_status + // guarantee that can be short-lived. return grpc_error_set_int( grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description), GRPC_ERROR_STR_GRPC_MESSAGE, @@ -823,7 +825,7 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { +static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -847,7 +849,7 @@ static int prepare_application_metadata( for (i = 0; i < total_count; i++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { @@ -864,7 +866,7 @@ static int prepare_application_metadata( for (int j = 0; j < i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + grpc_linked_mdelem *l = linked_from_md(md); GRPC_MDELEM_UNREF(exec_ctx, l->md); } return 0; @@ -882,9 +884,12 @@ static int prepare_application_metadata( } for (i = 0; i < total_count; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - GRPC_LOG_IF_ERROR( - "prepare_application_metadata", - grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md))); + grpc_linked_mdelem *l = linked_from_md(md); + grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l); + if (error != GRPC_ERROR_NONE) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + } + GRPC_LOG_IF_ERROR("prepare_application_metadata", error); } call->send_extra_metadata_count = 0; @@ -1422,7 +1427,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1723,7 +1728,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); + GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); @@ -1844,6 +1849,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) { return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; + case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: + return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 80eb80af78..e85b308850 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3851993c92..3d82a32e82 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - void (*begin_op)(grpc_completion_queue *cq, void *tag); + bool (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, } } -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); -} - -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); - gpr_ref(&cqd->pending_events); -} - -void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { -#ifndef NDEBUG - gpr_mu_lock(cq->mu); - if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { - cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); - cq->outstanding_tags = - gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * - cq->outstanding_tag_capacity); - } - cq->outstanding_tags[cq->outstanding_tag_count++] = tag; - gpr_mu_unlock(cq->mu); -#endif - cq->vtable->begin_op(cq, tag); -} - #ifndef NDEBUG static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; @@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif +static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + while (true) { + gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); + if (count == 0) { + return false; + } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { + break; + } + } + return true; +} + +static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_ref(&cqd->pending_events); + return true; +} + +bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + return cq->vtable->begin_op(cq, tag); +} + /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index af44482513..69d144bd95 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. - \a tag is currently used only in debug builds. */ -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); + \a tag is currently used only in debug builds. Return true on success, and + false if completion_queue has been shutdown. */ +bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fce7f8dca1..66dcc299aa 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, } /* stay locked, and gather up some stuff to do */ - grpc_cq_begin_op(cq, tag); + GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } details->reserved = NULL; rc->cq_idx = cq_idx; rc->type = BATCH_CALL; @@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH; goto done; } - grpc_cq_begin_op(cq_for_notification, tag); + if (grpc_cq_begin_op(cq_for_notification, tag) == false) { + gpr_free(rc); + error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN; + goto done; + } rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 14cacc8f18..3af8bdc11a 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -24,6 +24,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc++/impl/grpc_library.h> #include <grpc++/security/credentials.h> #include <grpc++/server_context.h> #include <grpc++/support/time.h> @@ -38,6 +39,7 @@ class DefaultGlobalClientCallbacks final void Destructor(ClientContext* context) override {} }; +static internal::GrpcLibraryInitializer g_gli_initializer; static DefaultGlobalClientCallbacks g_default_client_callbacks; static ClientContext::GlobalCallbacks* g_client_callbacks = &g_default_client_callbacks; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 92bacbe061..2483300cb1 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -151,19 +151,25 @@ class Server::SyncRequest final : public CompletionQueueTag { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; if (tag_) { - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (GRPC_CALL_OK != + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)) { + TeardownRequest(); + return; + } } else { if (!call_details_) { call_details_ = new grpc_call_details; grpc_call_details_init(call_details_); } - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - server, &call_, call_details_, - &request_metadata_, cq_, notify_cq, this)); + if (grpc_server_request_call(server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, + this) != GRPC_CALL_OK) { + TeardownRequest(); + return; + } } } @@ -286,12 +292,10 @@ class Server::SyncRequestThreadManager : public ThreadManager { if (ok) { // Calldata takes ownership of the completion queue inside sync_req SyncRequest::CallData cd(server_, sync_req); - { - // Prepare for the next request - if (!IsShutdown()) { - sync_req->SetupRequest(); // Create new completion queue for sync_req - sync_req->Request(server_->c_server(), server_cq_->cq()); - } + // Prepare for the next request + if (!IsShutdown()) { + sync_req->SetupRequest(); // Create new completion queue for sync_req + sync_req->Request(server_->c_server(), server_cq_->cq()); } GPR_TIMER_SCOPE("cd.Run()", 0); @@ -316,8 +320,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { } void Shutdown() override { - server_cq_->Shutdown(); ThreadManager::Shutdown(); + server_cq_->Shutdown(); } void Wait() override { @@ -652,10 +656,11 @@ ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue* notification_cq) { - grpc_server_request_registered_call( - server_->server(), registered_method, &call_, &context_->deadline_, - context_->client_metadata_.arr(), payload, call_cq_->cq(), - notification_cq->cq(), this); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( + server_->server(), registered_method, &call_, + &context_->deadline_, + context_->client_metadata_.arr(), payload, + call_cq_->cq(), notification_cq->cq(), this)); } ServerInterface::GenericAsyncRequest::GenericAsyncRequest( @@ -667,9 +672,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); - grpc_server_request_call(server->server(), &call_, &call_details_, - context->client_metadata_.arr(), call_cq->cq(), - notification_cq->cq(), this); + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server->server(), &call_, &call_details_, + context->client_metadata_.arr(), call_cq->cq(), + notification_cq->cq(), this)); } bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, diff --git a/src/node/src/server.js b/src/node/src/server.js index fba0aac938..8b7c0b6862 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -919,11 +919,6 @@ Server.prototype.addService = function(service, implementation) { }); }; -var logAddProtoServiceDeprecationOnce = _.once(function() { - common.log(constants.logVerbosity.INFO, - 'Server#addProtoService is deprecated. Use addService instead'); -}); - /** * Add a proto service to the server, with a corresponding implementation * @deprecated Use {@link grpc.Server#addService} instead @@ -931,11 +926,11 @@ var logAddProtoServiceDeprecationOnce = _.once(function() { * @param {Object<String, grpc.Server~handleCall>} implementation Map of method * names to method implementation for the provided service. */ -Server.prototype.addProtoService = function(service, implementation) { +Server.prototype.addProtoService = util.deprecate(function(service, + implementation) { var options; var protobuf_js_5_common = require('./protobuf_js_5_common'); var protobuf_js_6_common = require('./protobuf_js_6_common'); - logAddProtoServiceDeprecationOnce(); if (protobuf_js_5_common.isProbablyProtobufJs5(service)) { options = _.defaults(service.grpc_options, common.defaultGrpcOptions); this.addService( @@ -950,7 +945,7 @@ Server.prototype.addProtoService = function(service, implementation) { // We assume that this is a service attributes object this.addService(service, implementation); } -}; +}, 'Server#addProtoService: Use Server#addService instead'); /** * Binds the server to the given port, with SSL disabled if creds is an diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m index 7ee76ad333..7c46594dd5 100644 --- a/src/objective-c/GRPCClient/private/NSData+GRPC.m +++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m @@ -47,6 +47,8 @@ static void MallocAndCopyByteBufferToCharArray(grpc_byte_buffer *buffer, grpc_slice_unref(slice); *array = result; *length = uncompressed_length; + + grpc_byte_buffer_reader_destroy(&reader); } static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array, diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 99cb0ad971..577a5e9a42 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -110,4 +110,12 @@ self.state = GRXWriterStateFinished; } +- (void)dealloc { + GRXWriterState state = self.state; + if (state == GRXWriterStateNotStarted || + state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } +} + @end diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index fa3ded4c0c..3a5adbbf37 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -213,4 +213,74 @@ XCTAssertEqualObjects(handler.errorOrNil, nil); } +#define WRITE_ROUNDS (1000) +- (void)testBufferedPipeResumeWhenDealloc { + id anyValue = @7; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + }]; + + // Release after alloc; + GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; + pipe = nil; + + // Release after write but before start + pipe = [GRXBufferedPipe pipe]; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + pipe = nil; + + // Release after start but not write + pipe = [GRXBufferedPipe pipe]; + [pipe startWithWriteable:writeable]; + pipe = nil; + + // Release after start and write + pipe = [GRXBufferedPipe pipe]; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + [pipe startWithWriteable:writeable]; + pipe = nil; + + // Release after start, write and pause + pipe = [GRXBufferedPipe pipe]; + [pipe startWithWriteable:writeable]; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + pipe.state = GRXWriterStatePaused; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + pipe = nil; + + // Release after start, write, pause and finish + pipe = [GRXBufferedPipe pipe]; + [pipe startWithWriteable:writeable]; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + pipe.state = GRXWriterStatePaused; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + [pipe finishWithError:nil]; + pipe = nil; + + // Release after start, write, pause, finish and resume + pipe = [GRXBufferedPipe pipe]; + [pipe startWithWriteable:writeable]; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + pipe.state = GRXWriterStatePaused; + for (int i = 0; i < WRITE_ROUNDS; i++) { + [pipe writeValue:anyValue]; + } + [pipe finishWithError:nil]; + pipe.state = GRXWriterStateStarted; + pipe = nil; +} + @end diff --git a/src/objective-c/tests/build_one_example.sh b/src/objective-c/tests/build_one_example.sh index 576276096e..298d8e7043 100755 --- a/src/objective-c/tests/build_one_example.sh +++ b/src/objective-c/tests/build_one_example.sh @@ -37,9 +37,11 @@ rm -f Podfile.lock pod install set -o pipefail -XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)' +XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail|\bpassed\b)' xcodebuild \ build \ -workspace *.xcworkspace \ -scheme $SCHEME \ - -destination name="iPhone 6" | xcpretty + -destination name="iPhone 6" \ + | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "(GPBDictionary|GPBArray)" - diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh index 8fa9439284..d9c78ebac9 100755 --- a/src/objective-c/tests/run_tests.sh +++ b/src/objective-c/tests/run_tests.sh @@ -70,7 +70,7 @@ trap 'kill -9 `jobs -p` ; echo "EXIT TIME: $(date)"' EXIT # element of the pipe fails. # TODO(jcanizales): Use xctool instead? Issue #2540. set -o pipefail -XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)' +XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail|\bpassed\b)' echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ @@ -79,14 +79,18 @@ xcodebuild \ HOST_PORT_LOCALSSL=localhost:5051 \ HOST_PORT_LOCAL=localhost:5050 \ HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ - test | xcpretty + test \ + | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "(GPBDictionary|GPBArray)" - echo "TIME: $(date)" xcodebuild \ -workspace Tests.xcworkspace \ -scheme CoreCronetEnd2EndTests \ -destination name="iPhone 6" \ - test | xcpretty + test \ + | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "(GPBDictionary|GPBArray)" - # Temporarily disabled for (possible) flakiness on Jenkins. # Fix or reenable after confirmation/disconfirmation that it is the source of @@ -105,4 +109,6 @@ xcodebuild \ -scheme InteropTestsRemoteWithCronet \ -destination name="iPhone 6" \ HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ - test | xcpretty + test \ + | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "(GPBDictionary|GPBArray)" - |